from multiprocessing.shared_memory import SharedMemory from typing import Callable, NamedTuple, Optional import asyncio import socketio import aiohttp from PIL import ( Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops, ) from base64 import b64decode import signal import os import time import json import datetime class Animation(NamedTuple): x: int y: int frames: list[Image.Image] spf: float offset: int = 0 OFFSET_STATE = 0 OFFSET_AVOID = 125000 OFFSET_CANVAS = 250000 OFFSET_MASK = 375000 class Manager: def __init__(self, settings_path: str): self.shmem: Optional[SharedMemory] = None self.shmem_name = "omcb-bot" self.settings_path = settings_path self.base = "https://onemillioncheckboxes.com" self.last_update = 0 self.bits_toggled_on = 0 self.bits_toggled_off = 0 self.last_printout = 0 self.fonts: dict[tuple[str, int], ImageFont.FreeTypeFont] = {} self.default_font_size = 8 self.animations: list[Animation] = [] self.animation_functions: list[Callable] = [] def reload_config(self): with open(self.settings_path, "r") as fp: settings = json.load(fp) assert self.shmem is not None print("Resetting shmem...") self.shmem.buf[OFFSET_AVOID:] = bytes(500000 - OFFSET_AVOID) self.animations = [] self.animation_functions = [] if fontconfig := settings.get("default_font"): self.default_font_size = int(fontconfig.get("size", 8)) self.fonts["default", self.default_font_size] = self.get_font( fontconfig["path"], self.default_font_size ) for avoid in settings.get("avoid", []): if avoid["type"].startswith("~"): continue if avoid["type"] == "rect": self.add_avoid_rect( avoid["x"], avoid["y"], avoid["w"], avoid["h"] ) elif avoid["type"] == "range": self.add_avoid_range( range(avoid["start"], avoid["stop"], avoid.get("step", 1)) ) elif avoid["type"] == "image": with Image.open(avoid["path"]).convert("LA") as im: assert im.width == 1000 and im.height == 1000 pixels = im.load() assert pixels is not None for y in range(im.height): for x in range(im.width): _, a = pixels[(x, y)] # type: ignore if a > 128: self.add_avoid_index(x + y * 1000) else: raise ValueError(f"invalid avoid: {avoid}") print("AVOID", avoid) for elem in settings.get("elements", []): if elem["type"].startswith("~"): continue elif elem["type"] == "rect": with Image.new( "LA", (elem["w"], elem["h"]), ((255 if elem["fill"] else 0), 255), ) as im: self.put_image(elem["x"], elem["y"], im) elif elem["type"] == "text": self.put_text( elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", self.default_font_size), elem.get("negative", False), elem.get("padding", 2), ) elif elem["type"] in ("text_anim", "text_animation"): frames: list[Image.Image] = [] for line in elem["lines"]: if isinstance(line, str): frames.append( self.make_text_image( line, elem.get("font", "default"), elem.get("size", 8), elem.get("negative", False), elem.get("padding", 2), ) ) elif isinstance(line, dict): frames.append( self.make_text_image( line["text"], line.get("font", elem.get("font", "default")), line.get( "size", elem.get("size", self.default_font_size), ), line.get( "negative", elem.get("negative", False) ), line.get("padding", elem.get("padding", 2)), ) ) else: raise TypeError(f"invalid line: {line}") self.put_animation( elem["x"], elem["y"], frames, elem.get("spf", 10) ) elif elem["type"] in ("anim", "animation"): with Image.open(elem["path"]) as anim: self.put_animation( elem["x"], elem["y"], [ frame.convert("LA") for frame in ImageSequence.Iterator(anim) ], elem.get("spf", 10), elem.get("offset", 0) ) elif elem["type"] == "image": with Image.open(elem["path"]).convert("LA") as im: self.put_image(elem["x"], elem["y"], im) elif elem["type"] == "time": pos_x, pos_y = elem["x"], elem["y"] time_format = elem.get("format", "%Y%m%dT%H%M%S UTC") time_font = elem.get("font", "default") time_font_size = elem.get("size", self.default_font_size) time_negative = elem.get("negative", False) time_outline = elem.get("outline", 2) def update_time(): now = datetime.datetime.now(datetime.timezone.utc) self.put_image( pos_x, pos_y, self.make_text_image( now.strftime(time_format), time_font, time_font_size, time_negative, time_outline, ), ) self.animation_functions.append(update_time) elif elem["type"] == "shrek": shrek_x, shrek_y = elem["x"], elem["y"] shrek_font = self.get_font( elem.get("font", "default"), elem.get("size", self.default_font_size), ) shrek_spf = elem.get("spf", 10) with open(elem["path"], "r") as fp: lyrics = list(map(str.strip, fp)) def update_shrek(): with Image.new("LA", (325, 10), (0, 255)) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, 325, 10), fill=(0, 255)) now = datetime.datetime.now(datetime.timezone.utc) line = lyrics[ int(now.timestamp() / shrek_spf) % len(lyrics) ] draw.text( (2, -1), line, font=shrek_font, fill=(255, 255) ) self.put_image(shrek_x, shrek_y, im) self.animation_functions.append(update_shrek) elif elem["type"] == "minimap": minimap_x, minimap_y = elem["x"], elem["y"] minimap_w, minimap_h = elem["w"], elem["h"] minimap_state = { "last_update": time.time() } def update_minimap(): if self.shmem is None: return if (time.time() - minimap_state["last_update"]) < 5: return for y in range(minimap_h): for x in range(minimap_w): world_x = int(x * 1000 / minimap_w) world_y = int(y * 1000 / minimap_h) world_i = world_x + world_y * 1000 byte, mask = world_i >> 3, 0x80 >> (world_i & 7) state = bool(self.shmem.buf[byte] & mask) px, py = x + minimap_x, y + minimap_y self.set_index(px + py * 1000, state) minimap_state["last_update"] = time.time() self.animation_functions.append(update_minimap) else: raise TypeError(f"invalid element: {elem}") print("ADD", elem) async def listener(self): sio = socketio.AsyncClient() sio.on("connect", self.on_connect) sio.on("batched_bit_toggles", self.on_batched_bit_toggles) sio.on("full_state", self.on_full_state) await sio.connect(self.base.replace("http", "ws")) await sio.wait() def update_shmem(self, state: bytes): assert self.shmem is not None self.shmem.buf[OFFSET_STATE : OFFSET_STATE + 125000] = state async def on_connect(self): async with aiohttp.ClientSession() as http: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.update_shmem(buffer) self.last_update = data["timestamp"] async def on_full_state(self, data): assert self.shmem is not None buffer = b64decode(data["full_state"].encode() + b"=") self.update_shmem(buffer) self.last_update = data["timestamp"] async def on_batched_bit_toggles(self, data): assert self.shmem is not None bits_on, bits_off, timestamp = data if timestamp < self.last_update: print("old update, ignoring") self.last_update = timestamp self.bits_toggled_on += len(bits_on) self.bits_toggled_off += len(bits_off) for ndx in bits_on: byte, bit = ndx >> 3, ndx & 7 self.shmem.buf[OFFSET_STATE + byte] |= 0x80 >> bit for ndx in bits_off: byte, bit = ndx >> 3, ndx & 7 self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit) since_last_printout = time.time() - self.last_printout if since_last_printout >= 5: self.last_printout = time.time() print() print( f"Toggled on: {self.bits_toggled_on / since_last_printout}/s" ) print( f"Toggled off: {self.bits_toggled_off / since_last_printout}/s" ) self.bits_toggled_on = self.bits_toggled_off = 0 def on_sigusr1(self, signum, frame): assert self.shmem is not None print("Caught SIGUSR1, dumping state") buf = bytes(self.shmem.buf[:]) print("# Dumping state.png") with Image.new("RGB", (1000, 1000), 0) as im: for i in range(1000000): y, x = divmod(i, 1000) byte, bit = i >> 3, i & 7 im.putpixel( (x, y), ( 255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0, ( 255 if (buf[OFFSET_CANVAS + byte] << bit) & 0x80 else 0 ), 255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0, ), ) im.save("state.png") print("# Dumping avoid.png") with Image.new("L", (1000, 1000), 0) as im: for i in range(1000000): y, x = divmod(i, 1000) byte, bit = i >> 3, i & 7 im.putpixel( (x, y), 255 if (buf[OFFSET_AVOID + byte] << bit) & 0x80 else 0, ) im.save("avoid.png") print(f"# Animations {len(self.animations)}") for anim in self.animations: frame = int(time.time() / anim.spf + anim.offset) % len(anim.frames) print( str.format( "X: {:4d} Y: {:4d} Frames: {:3d} S/F: {:4.1f} Frame: {:d}", anim.x, anim.y, len(anim.frames), anim.spf, frame, ) ) print("Dump done") def on_sigusr2(self, signum, frame): print("Reloading config") self.reload_config() async def animator(self): while True: for animation in self.animations: frame = int(time.time() / animation.spf + animation.offset) % len( animation.frames ) self.put_image( animation.x, animation.y, animation.frames[frame] ) for func in self.animation_functions: func() await asyncio.sleep(0.1) def add_avoid_range(self, rng: range): assert self.shmem is not None for ndx in rng: byte, bit = ndx >> 3, ndx & 7 self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit def add_avoid_rect(self, sx: int, sy: int, w: int, h: int): for y in range(sy, sy + h): ox = y * 1000 self.add_avoid_range(range(sx + ox, sx + w + ox)) def add_avoid_index(self, index: int): assert self.shmem is not None assert 0 <= index < 1000000 byte, bit = index >> 3, index & 7 self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit def add_avoid_image(self, im: Image.Image): assert im.width == 1000 assert im.height == 1000 pixels = im.load() assert pixels is not None for i in range(1000000): y, x = divmod(i, 1000) la = pixels[(x, y)] assert isinstance(la, (tuple, list)) and len(la) == 2 if la[1] > 128: self.add_avoid_index(i) def set_index(self, index: int, value: bool): assert 0 <= index <= 1000000 assert self.shmem is not None byte, bit = index >> 3, index & 7 self.shmem.buf[OFFSET_MASK + byte] |= 0x80 >> bit if value: self.shmem.buf[OFFSET_CANVAS + byte] |= 0x80 >> bit else: self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit) def clear_index(self, index: int): assert 0 <= index <= 1000000 assert self.shmem is not None byte, bit = index >> 3, index & 7 self.shmem.buf[OFFSET_MASK + byte] &= 0xFF ^ (0x80 >> bit) self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit) def put_image(self, ox: int, oy: int, im: Image.Image): pixels = im.load() assert pixels is not None for y in range(im.height): for x in range(im.width): la: tuple[int, int] = pixels[(x, y)] # type: ignore if la[1]: self.set_index(x + ox + (y + oy) * 1000, la[0] > 0) def put_text( self, x: int, y: int, text: str, font_name: str = "default", font_size: int = 8, negative: bool = False, outline: int = 2, ): self.put_image( x, y, self.make_text_image( text, font_name, font_size, negative, outline ), ) def make_text_image( self, text: str, font_name: str = "default", font_size: int = 8, negative: bool = False, outline: int = 2, ): font = self.get_font(font_name, font_size) left, top, right, bottom = font.getbbox(text, anchor="lt") with Image.new( "RGBA", (int(right - left) + outline * 2, int(bottom - top) + outline * 2), 0, ) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0, 0, 0)) draw.text( (left + outline, top + outline), text, font=font, fill=(255, 255, 255, 255), anchor="lt", ) alpha = im.convert("L").filter( ImageFilter.MaxFilter(outline * 2 + 1) ) im.putalpha(alpha) if negative: im = ImageChops.invert(im) im.putalpha(alpha) # ty PIL return im.convert("LA") def put_animation( self, x: int, y: int, frames: list[Image.Image], spf: float = 10, offset: int = 0 ): self.animations.append(Animation(x, y, frames, spf, offset)) def get_font( self, font_name: str, font_size: int ) -> ImageFont.FreeTypeFont: if font := self.fonts.get((font_name, font_size)): return font print("FONT", font_name, font_size) font = ImageFont.truetype(font_name, font_size) self.fonts[font_name, font_size] = font return font async def __aenter__(self): print("Acquiring shared memory") self.shmem = SharedMemory(self.shmem_name, True, 500000) print("Loading config...") try: self.reload_config() except Exception: self.shmem.close() self.shmem.unlink() raise return self async def __aexit__(self, a, b, c): if self.shmem: print("cleaning up shmem") self.shmem.close() self.shmem.unlink() async def main(cfg_path: str = "./settings.json", *_): print(f"PID: {os.getpid()}") async with Manager(cfg_path) as mgr: signal.signal(signal.SIGUSR1, mgr.on_sigusr1) signal.signal(signal.SIGUSR2, mgr.on_sigusr2) print("Listening...") await asyncio.gather(mgr.listener(), mgr.animator()) if __name__ == "__main__": from sys import argv asyncio.run(main(*argv[1:]))