518 lines
18 KiB
Python
518 lines
18 KiB
Python
from multiprocessing.shared_memory import SharedMemory
|
|
from typing import Callable, NamedTuple, Optional
|
|
import asyncio
|
|
import socketio
|
|
import aiohttp
|
|
from PIL import (
|
|
Image,
|
|
ImageFont,
|
|
ImageDraw,
|
|
ImageFilter,
|
|
ImageSequence,
|
|
ImageChops,
|
|
)
|
|
from base64 import b64decode
|
|
import signal
|
|
import os
|
|
import time
|
|
import json
|
|
import datetime
|
|
|
|
|
|
class Animation(NamedTuple):
|
|
x: int
|
|
y: int
|
|
frames: list[Image.Image]
|
|
spf: float
|
|
|
|
|
|
OFFSET_STATE = 0
|
|
OFFSET_AVOID = 125000
|
|
OFFSET_CANVAS = 250000
|
|
OFFSET_MASK = 375000
|
|
|
|
|
|
class Manager:
|
|
def __init__(self, settings_path: str):
|
|
self.shmem: Optional[SharedMemory] = None
|
|
self.shmem_name = "omcb-bot"
|
|
|
|
self.settings_path = settings_path
|
|
|
|
self.base = "https://onemillioncheckboxes.com"
|
|
self.last_update = 0
|
|
|
|
self.bits_toggled_on = 0
|
|
self.bits_toggled_off = 0
|
|
self.last_printout = 0
|
|
|
|
self.fonts: dict[tuple[str, int], ImageFont.FreeTypeFont] = {}
|
|
self.default_font_size = 8
|
|
self.animations: list[Animation] = []
|
|
self.animation_functions: list[Callable] = []
|
|
|
|
def reload_config(self):
|
|
with open(self.settings_path, "r") as fp:
|
|
settings = json.load(fp)
|
|
|
|
assert self.shmem is not None
|
|
|
|
print("Resetting shmem...")
|
|
self.shmem.buf[OFFSET_AVOID:] = bytes(500000 - OFFSET_AVOID)
|
|
self.animations = []
|
|
self.animation_functions = []
|
|
|
|
if fontconfig := settings.get("default_font"):
|
|
self.default_font_size = int(fontconfig.get("size", 8))
|
|
self.fonts["default", self.default_font_size] = self.get_font(
|
|
fontconfig["path"], self.default_font_size
|
|
)
|
|
|
|
for avoid in settings.get("avoid", []):
|
|
if avoid["type"].startswith("~"):
|
|
continue
|
|
if avoid["type"] == "rect":
|
|
self.add_avoid_rect(
|
|
avoid["x"], avoid["y"], avoid["w"], avoid["h"]
|
|
)
|
|
elif avoid["type"] == "range":
|
|
self.add_avoid_range(
|
|
range(avoid["start"], avoid["stop"], avoid.get("step", 1))
|
|
)
|
|
elif avoid["type"] == "image":
|
|
with Image.open(avoid["path"]).convert("LA") as im:
|
|
assert im.width == 1000 and im.height == 1000
|
|
pixels = im.load()
|
|
assert pixels is not None
|
|
for y in range(im.height):
|
|
for x in range(im.width):
|
|
_, a = pixels[(x, y)] # type: ignore
|
|
if a > 128:
|
|
self.add_avoid_index(x + y * 1000)
|
|
else:
|
|
raise ValueError(f"invalid avoid: {avoid}")
|
|
print("AVOID", avoid)
|
|
|
|
for elem in settings.get("elements", []):
|
|
if elem["type"].startswith("~"):
|
|
continue
|
|
elif elem["type"] == "rect":
|
|
with Image.new(
|
|
"LA",
|
|
(elem["w"], elem["h"]),
|
|
((255 if elem["fill"] else 0), 255),
|
|
) as im:
|
|
self.put_image(elem["x"], elem["y"], im)
|
|
elif elem["type"] == "text":
|
|
self.put_text(
|
|
elem["x"],
|
|
elem["y"],
|
|
elem["text"],
|
|
elem.get("font", "default"),
|
|
elem.get("size", self.default_font_size),
|
|
elem.get("negative", False),
|
|
elem.get("padding", 2),
|
|
)
|
|
elif elem["type"] in ("text_anim", "text_animation"):
|
|
frames: list[Image.Image] = []
|
|
for line in elem["lines"]:
|
|
if isinstance(line, str):
|
|
frames.append(
|
|
self.make_text_image(
|
|
line,
|
|
elem.get("font", "default"),
|
|
elem.get("size", 8),
|
|
elem.get("negative", False),
|
|
elem.get("padding", 2),
|
|
)
|
|
)
|
|
elif isinstance(line, dict):
|
|
frames.append(
|
|
self.make_text_image(
|
|
line["text"],
|
|
line.get("font", elem.get("font", "default")),
|
|
line.get(
|
|
"size",
|
|
elem.get("size", self.default_font_size),
|
|
),
|
|
line.get(
|
|
"negative", elem.get("negative", False)
|
|
),
|
|
line.get("padding", elem.get("padding", 2)),
|
|
)
|
|
)
|
|
else:
|
|
raise TypeError(f"invalid line: {line}")
|
|
self.put_animation(
|
|
elem["x"], elem["y"], frames, elem.get("spf", 10)
|
|
)
|
|
elif elem["type"] in ("anim", "animation"):
|
|
with Image.open(elem["path"]) as anim:
|
|
self.put_animation(
|
|
elem["x"],
|
|
elem["y"],
|
|
[
|
|
frame.convert("LA")
|
|
for frame in ImageSequence.Iterator(anim)
|
|
],
|
|
elem.get("spf", 10),
|
|
)
|
|
elif elem["type"] == "image":
|
|
with Image.open(elem["path"]).convert("LA") as im:
|
|
self.put_image(elem["x"], elem["y"], im)
|
|
elif elem["type"] == "time":
|
|
pos_x, pos_y = elem["x"], elem["y"]
|
|
|
|
time_format = elem.get("format", "%Y%m%dT%H%M%S UTC")
|
|
time_font = elem.get("font", "default")
|
|
time_font_size = elem.get("size", self.default_font_size)
|
|
time_negative = elem.get("negative", False)
|
|
time_outline = elem.get("outline", 2)
|
|
|
|
def update_time():
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
self.put_image(
|
|
pos_x,
|
|
pos_y,
|
|
self.make_text_image(
|
|
now.strftime(time_format),
|
|
time_font,
|
|
time_font_size,
|
|
time_negative,
|
|
time_outline,
|
|
),
|
|
)
|
|
|
|
self.animation_functions.append(update_time)
|
|
elif elem["type"] == "shrek":
|
|
shrek_x, shrek_y = elem["x"], elem["y"]
|
|
|
|
shrek_font = self.get_font(
|
|
elem.get("font", "default"),
|
|
elem.get("size", self.default_font_size),
|
|
)
|
|
|
|
shrek_spf = elem.get("spf", 10)
|
|
|
|
with open(elem["path"], "r") as fp:
|
|
lyrics = list(map(str.strip, fp))
|
|
|
|
def update_shrek():
|
|
with Image.new("LA", (325, 10), (0, 255)) as im:
|
|
draw = ImageDraw.Draw(im)
|
|
draw.rectangle((0, 0, 325, 10), fill=(0, 255))
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
line = lyrics[
|
|
int(now.timestamp() / shrek_spf) % len(lyrics)
|
|
]
|
|
draw.text(
|
|
(2, -1), line, font=shrek_font, fill=(255, 255)
|
|
)
|
|
self.put_image(shrek_x, shrek_y, im)
|
|
|
|
self.animation_functions.append(update_shrek)
|
|
elif elem["type"] == "minimap":
|
|
minimap_x, minimap_y = elem["x"], elem["y"]
|
|
minimap_w, minimap_h = elem["w"], elem["h"]
|
|
|
|
minimap_state = {
|
|
"last_update": time.time()
|
|
}
|
|
def update_minimap():
|
|
if self.shmem is None:
|
|
return
|
|
if (time.time() - minimap_state["last_update"]) < 5:
|
|
return
|
|
for y in range(minimap_h):
|
|
for x in range(minimap_w):
|
|
world_x = int(x * 1000 / minimap_w)
|
|
world_y = int(y * 1000 / minimap_h)
|
|
world_i = world_x + world_y * 1000
|
|
byte, mask = world_i >> 3, 0x80 >> (world_i & 7)
|
|
state = bool(self.shmem.buf[byte] & mask)
|
|
px, py = x + minimap_x, y + minimap_y
|
|
self.set_index(px + py * 1000, state)
|
|
minimap_state["last_update"] = time.time()
|
|
|
|
self.animation_functions.append(update_minimap)
|
|
else:
|
|
raise TypeError(f"invalid element: {elem}")
|
|
print("ADD", elem)
|
|
|
|
async def listener(self):
|
|
sio = socketio.AsyncClient()
|
|
sio.on("connect", self.on_connect)
|
|
sio.on("batched_bit_toggles", self.on_batched_bit_toggles)
|
|
sio.on("full_state", self.on_full_state)
|
|
|
|
await sio.connect(self.base.replace("http", "ws"))
|
|
await sio.wait()
|
|
|
|
def update_shmem(self, state: bytes):
|
|
assert self.shmem is not None
|
|
self.shmem.buf[OFFSET_STATE : OFFSET_STATE + 125000] = state
|
|
|
|
async def on_connect(self):
|
|
async with aiohttp.ClientSession() as http:
|
|
async with http.get(f"{self.base}/api/initial-state") as req:
|
|
data = await req.json()
|
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
|
self.update_shmem(buffer)
|
|
self.last_update = data["timestamp"]
|
|
|
|
async def on_full_state(self, data):
|
|
assert self.shmem is not None
|
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
|
self.update_shmem(buffer)
|
|
self.last_update = data["timestamp"]
|
|
|
|
async def on_batched_bit_toggles(self, data):
|
|
assert self.shmem is not None
|
|
bits_on, bits_off, timestamp = data
|
|
if timestamp < self.last_update:
|
|
print("old update, ignoring")
|
|
|
|
self.last_update = timestamp
|
|
|
|
self.bits_toggled_on += len(bits_on)
|
|
self.bits_toggled_off = len(bits_off)
|
|
|
|
for ndx in bits_on:
|
|
byte, bit = ndx >> 3, ndx & 7
|
|
self.shmem.buf[OFFSET_STATE + byte] |= 0x80 >> bit
|
|
for ndx in bits_off:
|
|
byte, bit = ndx >> 3, ndx & 7
|
|
self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit)
|
|
|
|
since_last_printout = time.time() - self.last_printout
|
|
if since_last_printout >= 5:
|
|
self.last_printout = time.time()
|
|
print()
|
|
print(
|
|
f"Toggled on: {self.bits_toggled_on / since_last_printout}/s"
|
|
)
|
|
print(
|
|
f"Toggled off: {self.bits_toggled_off / since_last_printout}/s"
|
|
)
|
|
self.bits_toggled_on = self.bits_toggled_off = 0
|
|
|
|
def on_sigusr1(self, signum, frame):
|
|
assert self.shmem is not None
|
|
print("Caught SIGUSR1, dumping state")
|
|
buf = bytes(self.shmem.buf[:])
|
|
print("# Dumping state.png")
|
|
with Image.new("RGB", (1000, 1000), 0) as im:
|
|
for i in range(1000000):
|
|
y, x = divmod(i, 1000)
|
|
byte, bit = i >> 3, i & 7
|
|
im.putpixel(
|
|
(x, y),
|
|
(
|
|
255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0,
|
|
(
|
|
255
|
|
if (buf[OFFSET_CANVAS + byte] << bit) & 0x80
|
|
else 0
|
|
),
|
|
255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0,
|
|
),
|
|
)
|
|
im.save("state.png")
|
|
print("# Dumping avoid.png")
|
|
with Image.new("L", (1000, 1000), 0) as im:
|
|
for i in range(1000000):
|
|
y, x = divmod(i, 1000)
|
|
byte, bit = i >> 3, i & 7
|
|
im.putpixel(
|
|
(x, y),
|
|
255 if (buf[OFFSET_AVOID + byte] << bit) & 0x80 else 0,
|
|
)
|
|
im.save("avoid.png")
|
|
print(f"# Animations {len(self.animations)}")
|
|
for anim in self.animations:
|
|
frame = int(time.time() / anim.spf) % len(anim.frames)
|
|
print(
|
|
str.format(
|
|
"X: {:4d} Y: {:4d} Frames: {:3d} S/F: {:4.1f} Frame: {:d}",
|
|
anim.x,
|
|
anim.y,
|
|
len(anim.frames),
|
|
anim.spf,
|
|
frame,
|
|
)
|
|
)
|
|
print("Dump done")
|
|
|
|
def on_sigusr2(self, signum, frame):
|
|
print("Reloading config")
|
|
self.reload_config()
|
|
|
|
async def animator(self):
|
|
while True:
|
|
for animation in self.animations:
|
|
frame = int(time.time() / animation.spf) % len(
|
|
animation.frames
|
|
)
|
|
self.put_image(
|
|
animation.x, animation.y, animation.frames[frame]
|
|
)
|
|
for func in self.animation_functions:
|
|
func()
|
|
await asyncio.sleep(0.1)
|
|
|
|
def add_avoid_range(self, rng: range):
|
|
assert self.shmem is not None
|
|
for ndx in rng:
|
|
byte, bit = ndx >> 3, ndx & 7
|
|
self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit
|
|
|
|
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
|
|
for y in range(sy, sy + h):
|
|
ox = y * 1000
|
|
self.add_avoid_range(range(sx + ox, sx + w + ox))
|
|
|
|
def add_avoid_index(self, index: int):
|
|
assert self.shmem is not None
|
|
assert 0 <= index < 1000000
|
|
byte, bit = index >> 3, index & 7
|
|
self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit
|
|
|
|
def add_avoid_image(self, im: Image.Image):
|
|
assert im.width == 1000
|
|
assert im.height == 1000
|
|
pixels = im.load()
|
|
assert pixels is not None
|
|
for i in range(1000000):
|
|
y, x = divmod(i, 1000)
|
|
la = pixels[(x, y)]
|
|
assert isinstance(la, (tuple, list)) and len(la) == 2
|
|
if la[1] > 128:
|
|
self.add_avoid_index(i)
|
|
|
|
def set_index(self, index: int, value: bool):
|
|
assert 0 <= index <= 1000000
|
|
assert self.shmem is not None
|
|
byte, bit = index >> 3, index & 7
|
|
self.shmem.buf[OFFSET_MASK + byte] |= 0x80 >> bit
|
|
if value:
|
|
self.shmem.buf[OFFSET_CANVAS + byte] |= 0x80 >> bit
|
|
else:
|
|
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
|
|
|
def clear_index(self, index: int):
|
|
assert 0 <= index <= 1000000
|
|
assert self.shmem is not None
|
|
byte, bit = index >> 3, index & 7
|
|
self.shmem.buf[OFFSET_MASK + byte] &= 0xFF ^ (0x80 >> bit)
|
|
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
|
|
|
def put_image(self, ox: int, oy: int, im: Image.Image):
|
|
pixels = im.load()
|
|
assert pixels is not None
|
|
for y in range(im.height):
|
|
for x in range(im.width):
|
|
la: tuple[int, int] = pixels[(x, y)] # type: ignore
|
|
if la[1]:
|
|
self.set_index(x + ox + (y + oy) * 1000, la[0] > 0)
|
|
|
|
def put_text(
|
|
self,
|
|
x: int,
|
|
y: int,
|
|
text: str,
|
|
font_name: str = "default",
|
|
font_size: int = 8,
|
|
negative: bool = False,
|
|
outline: int = 2,
|
|
):
|
|
self.put_image(
|
|
x,
|
|
y,
|
|
self.make_text_image(
|
|
text, font_name, font_size, negative, outline
|
|
),
|
|
)
|
|
|
|
def make_text_image(
|
|
self,
|
|
text: str,
|
|
font_name: str = "default",
|
|
font_size: int = 8,
|
|
negative: bool = False,
|
|
outline: int = 2,
|
|
):
|
|
font = self.get_font(font_name, font_size)
|
|
left, top, right, bottom = font.getbbox(text, anchor="lt")
|
|
with Image.new(
|
|
"RGBA",
|
|
(int(right - left) + outline * 2, int(bottom - top) + outline * 2),
|
|
0,
|
|
) as im:
|
|
draw = ImageDraw.Draw(im)
|
|
draw.rectangle((0, 0, im.width, im.height), (0, 0, 0, 0))
|
|
draw.text(
|
|
(left + outline, top + outline),
|
|
text,
|
|
font=font,
|
|
fill=(255, 255, 255, 255),
|
|
anchor="lt",
|
|
)
|
|
|
|
alpha = im.convert("L").filter(
|
|
ImageFilter.MaxFilter(outline * 2 + 1)
|
|
)
|
|
im.putalpha(alpha)
|
|
if negative:
|
|
im = ImageChops.invert(im)
|
|
im.putalpha(alpha) # ty PIL
|
|
return im.convert("LA")
|
|
|
|
def put_animation(
|
|
self, x: int, y: int, frames: list[Image.Image], spf: float = 10
|
|
):
|
|
self.animations.append(Animation(x, y, frames, spf))
|
|
|
|
def get_font(
|
|
self, font_name: str, font_size: int
|
|
) -> ImageFont.FreeTypeFont:
|
|
if font := self.fonts.get((font_name, font_size)):
|
|
return font
|
|
print("FONT", font_name, font_size)
|
|
font = ImageFont.truetype(font_name, font_size)
|
|
self.fonts[font_name, font_size] = font
|
|
return font
|
|
|
|
async def __aenter__(self):
|
|
print("Acquiring shared memory")
|
|
self.shmem = SharedMemory(self.shmem_name, True, 500000)
|
|
print("Loading config...")
|
|
try:
|
|
self.reload_config()
|
|
except Exception:
|
|
self.shmem.close()
|
|
self.shmem.unlink()
|
|
raise
|
|
return self
|
|
|
|
async def __aexit__(self, a, b, c):
|
|
if self.shmem:
|
|
print("cleaning up shmem")
|
|
self.shmem.close()
|
|
self.shmem.unlink()
|
|
|
|
|
|
async def main(cfg_path: str = "./settings.json", *_):
|
|
print(f"PID: {os.getpid()}")
|
|
async with Manager(cfg_path) as mgr:
|
|
signal.signal(signal.SIGUSR1, mgr.on_sigusr1)
|
|
signal.signal(signal.SIGUSR2, mgr.on_sigusr2)
|
|
|
|
print("Listening...")
|
|
await asyncio.gather(mgr.listener(), mgr.animator())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from sys import argv
|
|
|
|
asyncio.run(main(*argv[1:]))
|