From 9c4c9774ca3743fc29330b6ef7675c417c287549 Mon Sep 17 00:00:00 2001 From: hkc Date: Mon, 8 Jul 2024 22:09:25 +0300 Subject: [PATCH] Added negation support to `text` element --- async-bot.py | 115 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 92 insertions(+), 23 deletions(-) diff --git a/async-bot.py b/async-bot.py index 09ec395..ba55d5d 100644 --- a/async-bot.py +++ b/async-bot.py @@ -4,7 +4,14 @@ from socketio import AsyncClient, AsyncSimpleClient import socketio from aiohttp import ClientSession, ClientTimeout from aiohttp_socks import ProxyConnector -from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops +from PIL import ( + Image, + ImageFont, + ImageDraw, + ImageFilter, + ImageSequence, + ImageChops, +) from base64 import b64decode from random import choice, randint from json import load @@ -19,6 +26,7 @@ Font = ImageFont.FreeTypeFont | ImageFont.ImageFont TIMEOUT = ClientTimeout(120) + class AsyncBotManager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base @@ -46,20 +54,31 @@ class AsyncBotManager: self.ready_event = asyncio.Event() @staticmethod - def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image: + def get_text_image( + text: str, + font: ImageFont.ImageFont | ImageFont.FreeTypeFont, + negative: bool = False, + ) -> Image.Image: left, top, right, bottom = font.getbbox(text) - with Image.new("LA", (int(right - left) + 4, int(bottom - top) + 16), 0) as im: + with Image.new( + "LA", (int(right - left) + 4, int(bottom - top) + 16), 0 + ) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0)) - draw.text((left + 2, top + 2), text, font=font, fill=(255, 0), - anchor="lt") + draw.text( + (left + 2, top + 2), + text, + font=font, + fill=(0 if negative else 255, 0), + anchor="lt", + ) alpha = im.convert("L").filter(ImageFilter.MaxFilter(5)) im.putalpha(alpha) return im.copy() def get_font(self, font_name: str, size: int = 8): - if (font := self.fonts.get((font_name, size))): + if font := self.fonts.get((font_name, size)): return font if font_name == "default": font = ImageFont.load_default(size) @@ -68,9 +87,20 @@ class AsyncBotManager: self.fonts[font_name, size] = font return font - def put_text(self, x: int, y: int, text: str, font: str, size: int = 8): - self.put_image(x, y, self.get_text_image(text, self.get_font(font, size))) - + def put_text( + self, + x: int, + y: int, + text: str, + font: str, + size: int = 8, + negative: bool = False, + ): + self.put_image( + x, + y, + self.get_text_image(text, self.get_font(font, size), negative), + ) def get_image_diff(self, ox: int, oy: int, im: Image.Image) -> PixelMap: pixmap = PixelMap({}) @@ -154,7 +184,9 @@ class AsyncBotManager: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") - self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer)) + self.canvas.paste( + Image.frombytes("1", (1000, 1000), buffer) + ) self._last_update = data["timestamp"] print("Initial state received") async with AsyncSimpleClient(http_session=http) as sio: @@ -172,7 +204,9 @@ class AsyncBotManager: self.reader_event.set() self.ready_event.set() if event == "full_state": - buffer = b64decode(data["full_state"].encode() + b"=") + buffer = b64decode( + data["full_state"].encode() + b"=" + ) image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() @@ -183,7 +217,9 @@ class AsyncBotManager: print("SKIPPING UPDATES: TOO OLD") else: self._last_update = timestamp - self._read_boxes += len(bits_on) + len(bits_off) + self._read_boxes += len(bits_on) + len( + bits_off + ) for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) @@ -194,10 +230,16 @@ class AsyncBotManager: print("unknown event", event, data) now = time_now() if (now - self._last_printout) > 5: - outgoing = self._written_boxes / (now - self._last_printout) - incoming = self._read_boxes / (now - self._last_printout) + outgoing = self._written_boxes / ( + now - self._last_printout + ) + incoming = self._read_boxes / ( + now - self._last_printout + ) print() - print(f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s") + print( + f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s" + ) print(f"Alive workers: {len(self._active)}") if len(self._active) < 2 and self._n_printouts > 3: @@ -216,7 +258,9 @@ class AsyncBotManager: else: n_correct += 1 - print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}") + print( + f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}" + ) self._written_boxes = 0 self._read_boxes = 0 self._active.clear() @@ -307,12 +351,27 @@ async def amain(conf_path: str = "settings.json", *_) -> None: for elem in settings["elements"]: if elem["type"] == "text": - mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8)) + mgr.put_text( + elem["x"], + elem["y"], + elem["text"], + elem.get("font", "default"), + elem.get("size", 8), + elem.get("negative", False), + ) print("ADD text", elem) elif elem["type"] == "text_anim": frames: list[Image.Image] = [] for text in elem["lines"]: - frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8)))) + frames.append( + mgr.get_text_image( + text, + mgr.get_font( + elem.get("font", "default"), + elem.get("size", 8), + ), + ) + ) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() @@ -325,7 +384,10 @@ async def amain(conf_path: str = "settings.json", *_) -> None: elif elem["type"] == "time": time_format = elem["format"] pos_x, pos_y = elem["x"], elem["y"] - font = mgr.get_font(elem.get("font", "default"), elem.get("size", 8)) + font = mgr.get_font( + elem.get("font", "default"), elem.get("size", 8) + ) + def update() -> PixelMap: now = datetime.datetime.now(datetime.timezone.utc) txt = now.strftime(time_format) @@ -333,6 +395,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: pixmap = mgr.get_image_diff(pos_x, pos_y, img) img.close() return pixmap + mgr.animation_functions.append(update) elif elem["type"] == "tile": with Image.open(elem["path"]).convert("LA") as im: @@ -349,7 +412,9 @@ async def amain(conf_path: str = "settings.json", *_) -> None: frames: list[Image.Image] = [] for frame in ImageSequence.Iterator(anim): frames.append(frame.copy()) - mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) + mgr.add_animation( + elem["x"], elem["y"], frames, elem["spf"] + ) for frame in frames: frame.close() print("ADD animation", elem) @@ -380,7 +445,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: # pack rgb888 to rgb565 rgb: int = (r >> 3) << 11 rgb |= (g >> 2) << 5 - rgb |= (b >> 3) + rgb |= b >> 3 # write to a bitstream for i in range(16): @@ -396,7 +461,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None: offset = elem["offset"] length = elem.get("length", 1000000) written = 0 - while (char := fp.read(1)): + while char := fp.read(1): byte = char[0] if written > length: break @@ -424,7 +489,10 @@ async def amain(conf_path: str = "settings.json", *_) -> None: ) else: res = await asyncio.gather( - *[mgr.writer(i, None, settings["delay"]) for i in range(settings["n_bots"])], + *[ + mgr.writer(i, None, settings["delay"]) + for i in range(settings["n_bots"]) + ], ) for ret in res: @@ -436,4 +504,5 @@ async def amain(conf_path: str = "settings.json", *_) -> None: if __name__ == "__main__": from sys import argv + asyncio.run(amain(*argv[1:]), debug=True)