From 40c50328d0f188575cc4a9d3fc1232f68cc99c3a Mon Sep 17 00:00:00 2001 From: hkc Date: Tue, 9 Jul 2024 14:46:50 +0300 Subject: [PATCH] Added a bunch (almost all) types into manager --- async-bot.py | 20 +-- swarm/manager.py | 365 ++++++++++++++++++++++++++++++++++++++------ swarm/settings.json | 111 ++++++++++++++ 3 files changed, 440 insertions(+), 56 deletions(-) create mode 100644 swarm/settings.json diff --git a/async-bot.py b/async-bot.py index bd4e8c8..7ee8ca0 100644 --- a/async-bot.py +++ b/async-bot.py @@ -353,7 +353,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: print("AVOID image", avoid["path"]) for elem in settings["elements"]: - if elem["type"] == "text": + if elem["type"] == "text": # XXX migrated mgr.put_text( elem["x"], elem["y"], @@ -363,7 +363,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: elem.get("negative", False), ) print("ADD text", elem) - elif elem["type"] == "text_anim": + elif elem["type"] == "text_anim": # XXX migrated frames: list[Image.Image] = [] for text in elem["lines"]: frames.append( @@ -379,12 +379,12 @@ async def amain(conf_path: str = "settings.json", *_) -> None: for frame in frames: frame.close() print("ADD text animation", elem) - elif elem["type"] == "image": + elif elem["type"] == "image": # XXX migrated with Image.open(elem["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) print("ADD image", elem) - elif elem["type"] == "time": + elif elem["type"] == "time": # XXX migrated time_format = elem["format"] pos_x, pos_y = elem["x"], elem["y"] font = mgr.get_font( @@ -400,7 +400,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: return pixmap mgr.animation_functions.append(update_time) - elif elem["type"] == "tile": + elif elem["type"] == "tile": # XXX unused with Image.open(elem["path"]).convert("LA") as im: for oy in range(elem.get("h", im.height)): for ox in range(elem.get("w", im.width)): @@ -410,7 +410,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: index = x + y * 1000 mgr.put_bit(index, l > 0) print("ADD tile", elem) - elif elem["type"] == "animation": + elif elem["type"] == "animation": # XXX migrated with Image.open(elem["path"]) as anim: frames: list[Image.Image] = [] for frame in ImageSequence.Iterator(anim): @@ -421,7 +421,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: for frame in frames: frame.close() print("ADD animation", elem) - elif elem["type"] == "rgb111": + elif elem["type"] == "rgb111": # XXX unused ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): @@ -434,7 +434,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: mgr.put_bit(start_ndx, r > 128) mgr.put_bit(start_ndx + 1, g > 128) mgr.put_bit(start_ndx + 2, b > 128) - elif elem["type"] == "rgb565": + elif elem["type"] == "rgb565": # XXX unused ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): @@ -459,7 +459,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: for y in range(elem["y"], elem["y"] + elem["h"]): for x in range(elem["x"], elem["x"] + elem["w"]): mgr.put_pixel(x, y, elem["fill"]) - elif elem["type"] == "blob": + elif elem["type"] == "blob": # XXX unused with open(elem["path"], "rb") as fp: offset = elem["offset"] length = elem.get("length", 1000000) @@ -474,7 +474,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: mgr.put_bit(offset, bool((byte >> (7 - i)) & 1)) written += 1 offset += 1 - elif elem["type"] == "shrek": + elif elem["type"] == "shrek": # XXX: migrated with open(elem["path"], "r") as fp: lyrics = list(map(str.strip, fp)) diff --git a/swarm/manager.py b/swarm/manager.py index 4876f88..f40954e 100644 --- a/swarm/manager.py +++ b/swarm/manager.py @@ -1,5 +1,5 @@ from multiprocessing.shared_memory import SharedMemory -from typing import Optional +from typing import Callable, NamedTuple, NewType, Optional import asyncio import socketio import aiohttp @@ -12,24 +12,33 @@ from PIL import ( ImageSequence, ImageChops, ) -from enum import IntFlag from base64 import b64decode import signal import os import time +import json +import datetime -class PixelMask(IntFlag): - AVOID = 16 - MASK = 32 - FILL = 64 - CHECKED = 128 +class Animation(NamedTuple): + x: int + y: int + frames: list[Image.Image] + spf: float +OFFSET_STATE = 0 +OFFSET_AVOID = 125000 +OFFSET_CANVAS = 250000 +OFFSET_MASK = 375000 + class Manager: - def __init__(self): + def __init__(self, settings_path: str): self.shmem: Optional[SharedMemory] = None self.shmem_name = "omcb-bot" + + self.settings_path = settings_path + self.base = "https://onemillioncheckboxes.com" self.last_update = 0 @@ -37,6 +46,143 @@ class Manager: self.bits_toggled_off = 0 self.last_printout = 0 + self.fonts: dict[tuple[str, int], ImageFont.FreeTypeFont] = {} + self.default_font_size = 8 + self.animations: list[Animation] = [] + self.animation_functions: list[Callable] = [] + + def reload_config(self): + with open(self.settings_path, "r") as fp: + settings = json.load(fp) + + assert self.shmem is not None + + print("Resetting shmem...") + + if fontconfig := settings.get("default_font"): + self.default_font_size = int(fontconfig.get("size", 8)) + self.fonts["default", self.default_font_size] = self.get_font( + fontconfig["path"], + self.default_font_size + ) + + for avoid in settings.get("avoid", []): + if avoid["type"] == "rect": + self.add_avoid_rect( + avoid["x"], avoid["y"], avoid["w"], avoid["h"] + ) + elif avoid["type"] == "range": + self.add_avoid_range( + range(avoid["start"], avoid["stop"], avoid.get("step", 1)) + ) + elif avoid["type"] == "image": + with Image.open(avoid["path"]).convert("LA") as im: + assert im.width == 1000 and im.height == 1000 + for y in range(im.height): + for x in range(im.width): + _, a = im.getpixel((x, y)) # type: ignore + if a > 128: + self.add_avoid_index(x + y * 1000) + else: + raise ValueError(f"invalid avoid: {avoid}") + print("AVOID", avoid) + + for elem in settings.get("elements", []): + if elem["type"].startswith("~"): + continue + elif elem["type"] == "text": + self.put_text( + elem["x"], + elem["y"], + elem["text"], + elem.get("font", "default"), + elem.get("size", self.default_font_size), + elem.get("negative", False), + elem.get("padding", 2), + ) + elif elem["type"] in ("text_anim", "text_animation"): + frames: list[Image.Image] = [] + for line in elem["lines"]: + if isinstance(line, str): + frames.append( + self.make_text_image( + line, + elem.get("font", "default"), + elem.get("size", 8), + elem.get("negative", False), + elem.get("padding", 2), + ) + ) + elif isinstance(line, dict): + frames.append( + self.make_text_image( + line["text"], + line.get("font", elem.get("font", "default")), + line.get("size", elem.get("size", self.default_font_size)), + line.get( + "negative", elem.get("negative", False) + ), + line.get("padding", elem.get("padding", 2)), + ) + ) + else: + raise TypeError(f"invalid line: {line}") + self.put_animation( + elem["x"], elem["y"], frames, elem.get("spf", 10) + ) + elif elem["type"] in ("anim", "animation"): + with Image.open(elem["path"]) as anim: + self.put_animation(elem["x"], elem["y"], [ + frame.convert("LA") + for frame in ImageSequence.Iterator(anim) + ], elem.get("spf", 10)) + elif elem["type"] == "image": + with Image.open(elem["path"]).convert("LA") as im: + self.put_image(elem["x"], elem["y"], im) + elif elem["type"] == "time": + pos_x, pos_y = elem["x"], elem["y"] + + time_format = elem.get("format", "%Y%m%dT%H%M%S UTC") + time_font = elem.get("font", "default") + time_font_size = elem.get("size", self.default_font_size) + time_negative = elem.get("negative", False) + time_outline = elem.get("outline", 2) + + def update_time(): + now = datetime.datetime.now(datetime.timezone.utc) + self.put_image(pos_x, pos_y, self.make_text_image( + now.strftime(time_format), + time_font, + time_font_size, + time_negative, + time_outline + )) + + self.animation_functions.append(update_time) + elif elem["type"] == "shrek": + shrek_x, shrek_y = elem["x"], elem["y"] + + shrek_font = self.get_font(elem.get("font", "default"), elem.get("size", self.default_font_size)) + + with open(elem["path"], "r") as fp: + lyrics = list(map(str.strip, fp)) + + def update_shrek(): + with Image.new("LA", (325, 10), (0, 255)) as im: + draw = ImageDraw.Draw(im) + draw.rectangle((0, 0, 325, 10), fill=(0, 255)) + now = datetime.datetime.now(datetime.timezone.utc) + line = lyrics[ + int(now.timestamp() / elem["spf"]) % len(lyrics) + ] + draw.text((2, -1), line, font=shrek_font, fill=(255, 255)) + self.put_image(shrek_x, shrek_y, im) + + self.animation_functions.append(update_shrek) + else: + raise TypeError(f"invalid element: {elem}") + print("ADD", elem) + async def listener(self): sio = socketio.AsyncClient() sio.on("connect", self.on_connect) @@ -47,17 +193,8 @@ class Manager: await sio.wait() def update_shmem(self, state: bytes): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") - - buffer = bytearray(bytes(1000000)) - for i in range(1000000): - byte, bit = divmod(i, 8) - if state[byte] & (0x80 >> bit): - buffer[i] |= PixelMask.CHECKED - else: - buffer[i] &= ~PixelMask.CHECKED - self.shmem.buf[:] = buffer + assert self.shmem is not None + self.shmem.buf[OFFSET_STATE:OFFSET_STATE + 125000] = state async def on_connect(self): async with aiohttp.ClientSession() as http: @@ -68,15 +205,13 @@ class Manager: self.last_update = data["timestamp"] async def on_full_state(self, data): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") + assert self.shmem is not None buffer = b64decode(data["full_state"].encode() + b"=") self.update_shmem(buffer) self.last_update = data["timestamp"] async def on_batched_bit_toggles(self, data): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") + assert self.shmem is not None bits_on, bits_off, timestamp = data if timestamp < self.last_update: print("old update, ignoring") @@ -86,43 +221,76 @@ class Manager: self.bits_toggled_on += len(bits_on) self.bits_toggled_off = len(bits_off) - for bit in bits_on: - self.shmem.buf[bit] |= PixelMask.CHECKED - for bit in bits_off: - self.shmem.buf[bit] &= ~PixelMask.CHECKED + for ndx in bits_on: + byte, bit = divmod(ndx, 8) + self.shmem.buf[OFFSET_STATE + byte] |= (0x80 >> bit) + for ndx in bits_off: + byte, bit = divmod(ndx, 8) + self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit) since_last_printout = time.time() - self.last_printout if since_last_printout >= 5: self.last_printout = time.time() print() - print(f"Toggled on: {self.bits_toggled_on / since_last_printout}/s") - print(f"Toggled off: {self.bits_toggled_off / since_last_printout}/s") + print( + f"Toggled on: {self.bits_toggled_on / since_last_printout}/s" + ) + print( + f"Toggled off: {self.bits_toggled_off / since_last_printout}/s" + ) self.bits_toggled_on = self.bits_toggled_off = 0 def on_sigusr1(self, signum, frame): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") + assert self.shmem is not None print("Caught SIGUSR1, dumping state") buf = bytes(self.shmem.buf[:]) with Image.new("RGB", (1000, 1000), 0) as im: for i in range(1000000): y, x = divmod(i, 1000) - im.putpixel((x, y), ( - 255 if buf[i] & PixelMask.FILL else 0, - 255 if buf[i] & PixelMask.MASK else 0, - 255 if buf[i] & PixelMask.CHECKED else 0 - )) + byte, bit = divmod(i, 8) + im.putpixel( + (x, y), + ( + 255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0, + 255 if (buf[OFFSET_CANVAS + byte] << bit) & 0x80 else 0, + 255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0, + ), + ) im.save("state.png") + with Image.new("L", (1000, 1000), 0) as im: + for i in range(1000000): + y, x = divmod(i, 1000) + byte, bit = divmod(i, 8) + im.putpixel( + (x, y), + 255 if (buf[OFFSET_AVOID + byte] << bit) & 0x80 else 0, + ) + im.save("avoid.png") + print("Dump done") + + + def on_sigusr2(self, signum, frame): + print("Reloading config") + self.reload_config() async def animator(self): while True: + for animation in self.animations: + frame = int(time.time() / animation.spf) % len( + animation.frames + ) + self.put_image( + animation.x, animation.y, animation.frames[frame] + ) + for func in self.animation_functions: + func() await asyncio.sleep(0.1) def add_avoid_range(self, rng: range): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") + assert self.shmem is not None for ndx in rng: - self.shmem.buf[ndx] |= PixelMask.AVOID + byte, bit = divmod(ndx, 8) + self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit) def add_avoid_rect(self, sx: int, sy: int, w: int, h: int): for y in range(sy, sy + h): @@ -130,10 +298,10 @@ class Manager: self.add_avoid_range(range(sx + ox, sx + w + ox)) def add_avoid_index(self, index: int): - if not self.shmem: - raise ValueError("shared memory is not initialized yet") + assert self.shmem is not None assert 0 <= index < 1000000 - self.shmem.buf[index] |= PixelMask.AVOID + byte, bit = divmod(index, 8) + self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit) def add_avoid_image(self, im: Image.Image): assert im.width == 1000 @@ -142,11 +310,110 @@ class Manager: y, x = divmod(i, 1000) la = im.getpixel((x, y)) assert isinstance(la, (tuple, list)) and len(la) == 2 - if la[1]: + if la[1] > 128: self.add_avoid_index(i) + def set_index(self, index: int, value: bool): + assert 0 <= index <= 1000000 + assert self.shmem is not None + byte, bit = divmod(index, 8) + self.shmem.buf[OFFSET_MASK + byte] |= (0x80 >> bit) + if value: + self.shmem.buf[OFFSET_CANVAS + byte] |= (0x80 >> bit) + else: + self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit) + + def clear_index(self, index: int): + assert 0 <= index <= 1000000 + assert self.shmem is not None + byte, bit = divmod(index, 8) + self.shmem.buf[OFFSET_MASK + byte] &= 0xFF ^ (0x80 >> bit) + self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit) + + def put_image(self, ox: int, oy: int, im: Image.Image): + for y in range(im.height): + for x in range(im.width): + la: tuple[int, int] = im.getpixel((x, y)) # type: ignore + if la[1]: + self.set_index(x + ox + (y + oy) * 1000, la[0] > 0) + + def put_text( + self, + x: int, + y: int, + text: str, + font_name: str = "default", + font_size: int = 8, + negative: bool = False, + outline: int = 2, + ): + self.put_image( + x, + y, + self.make_text_image( + text, font_name, font_size, negative, outline + ), + ) + + def make_text_image( + self, + text: str, + font_name: str = "default", + font_size: int = 8, + negative: bool = False, + outline: int = 2, + ): + font = self.get_font(font_name, font_size) + left, top, right, bottom = font.getbbox(text, anchor="lt") + with Image.new( + "RGBA", + (int(right - left) + outline * 2, int(bottom - top) + outline * 2), + 0, + ) as im: + draw = ImageDraw.Draw(im) + draw.rectangle((0, 0, im.width, im.height), (0, 0, 0, 0)) + draw.text( + (left + outline, top + outline), + text, + font=font, + fill=(255, 255, 255, 255), + anchor="lt", + ) + + alpha = im.convert("L").filter( + ImageFilter.MaxFilter(outline * 2 + 1) + ) + im.putalpha(alpha) + if negative: + im = ImageChops.invert(im) + im.putalpha(alpha) # ty PIL + return im.convert("LA") + + def put_animation( + self, x: int, y: int, frames: list[Image.Image], spf: float = 10 + ): + self.animations.append(Animation(x, y, frames, spf)) + + def get_font( + self, font_name: str, font_size: int + ) -> ImageFont.FreeTypeFont: + if font := self.fonts.get((font_name, font_size)): + return font + print("FONT", font_name, font_size) + font = ImageFont.truetype(font_name, font_size) + self.fonts[font_name, font_size] = font + return font + async def __aenter__(self): - self.shmem = SharedMemory(self.shmem_name, True, 1000000) + print("Acquiring shared memory") + self.shmem = SharedMemory(self.shmem_name, True, 500000) + print("Loading config...") + try: + self.reload_config() + except Exception: + self.shmem.close() + self.shmem.unlink() + raise return self async def __aexit__(self, a, b, c): @@ -158,9 +425,15 @@ class Manager: async def main(): print(f"PID: {os.getpid()}") - async with Manager() as mgr: + async with Manager("./settings.json") as mgr: signal.signal(signal.SIGUSR1, mgr.on_sigusr1) - await mgr.listener() + signal.signal(signal.SIGUSR2, mgr.on_sigusr2) + + print("Listening...") + await asyncio.gather( + mgr.listener(), + mgr.animator() + ) if __name__ == "__main__": diff --git a/swarm/settings.json b/swarm/settings.json new file mode 100644 index 0000000..82aae10 --- /dev/null +++ b/swarm/settings.json @@ -0,0 +1,111 @@ +{ + "default_font": { + "path": "../ic8x8u.ttf", + "size": 8 + }, + "avoid": [ + { + "type": "rect", + "x": 0, + "y": 0, + "w": 1000, + "h": 10, + "description": "Not ruining fun for normal users" + }, + { + "type": "rect", + "x": 0, + "y": 20, + "w": 1000, + "h": 80, + "description": "The VOID" + }, + { + "type": "rect", + "x": 0, + "y": 750, + "w": 123, + "h": 123, + "description": "catgirls.win QR code" + }, + { + "type": "range", + "start": 900000, + "stop": 1000000, + "description": "catgirls.win text (both b64 and plain)" + }, + { + "type": "image", + "path": "../avoid_masks/noita.png", + "description": "Noita logo by Cr4xy" + } + ], + "elements": [ + { + "type": "time", + "x": 75, + "y": 100, + "format": "And time is: %Y-%m-%d %H:%M:%S UTC", + "spf": 20, + "font": "/usr/share/fonts/TTF/TerminusTTF.ttf", + "size": 12 + }, + { + "type": "image", + "path": "../pictures/casey.png", + "x": 0, + "y": 128 + }, + { + "type": "animation", + "path": "../pictures/neko.gif", + "spf": 30, + "x": 625, + "y": 496 + }, + { + "type": "image", + "path": "../pictures/casey_qr.png", + "x": 10, + "y": 240 + }, + { + "type": "image", + "path": "../pictures/hueh.png", + "x": 490, + "y": 810 + }, + { + "type": "text_anim", + "font": "/usr/share/fonts/TTF/TerminusTTF.ttf", + "size": 18, + "lines": [ + "крипер2004", + "Крипер2004", + "КРипер2004", + "КРИпер2004", + "КРИПер2004", + "КРИПЕр2004", + "КРИПЕР2004", + "кРИПЕР2004", + "крИПЕР2004", + "криПЕР2004", + "крипЕР2004", + "крипеР2004", + "крипер2004" + ], + "spf": 30, + "x": 3, + "y": 872 + }, + { + "type": "shrek", + "font": "../creep2.ttf", + "size": 11, + "path": "../funnies/shrek.txt", + "x": 490, + "y": 700, + "spf": 120 + } + ] +}