Added negation support to `text` element

This commit is contained in:
Casey 2024-07-08 22:09:25 +03:00
parent 9979897bd0
commit 9c4c9774ca
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
1 changed files with 92 additions and 23 deletions

View File

@ -4,7 +4,14 @@ from socketio import AsyncClient, AsyncSimpleClient
import socketio
from aiohttp import ClientSession, ClientTimeout
from aiohttp_socks import ProxyConnector
from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops
from PIL import (
Image,
ImageFont,
ImageDraw,
ImageFilter,
ImageSequence,
ImageChops,
)
from base64 import b64decode
from random import choice, randint
from json import load
@ -19,6 +26,7 @@ Font = ImageFont.FreeTypeFont | ImageFont.ImageFont
TIMEOUT = ClientTimeout(120)
class AsyncBotManager:
def __init__(self, base: str = "https://onemillioncheckboxes.com"):
self.base = base
@ -46,20 +54,31 @@ class AsyncBotManager:
self.ready_event = asyncio.Event()
@staticmethod
def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image:
def get_text_image(
text: str,
font: ImageFont.ImageFont | ImageFont.FreeTypeFont,
negative: bool = False,
) -> Image.Image:
left, top, right, bottom = font.getbbox(text)
with Image.new("LA", (int(right - left) + 4, int(bottom - top) + 16), 0) as im:
with Image.new(
"LA", (int(right - left) + 4, int(bottom - top) + 16), 0
) as im:
draw = ImageDraw.Draw(im)
draw.rectangle((0, 0, im.width, im.height), (0, 0))
draw.text((left + 2, top + 2), text, font=font, fill=(255, 0),
anchor="lt")
draw.text(
(left + 2, top + 2),
text,
font=font,
fill=(0 if negative else 255, 0),
anchor="lt",
)
alpha = im.convert("L").filter(ImageFilter.MaxFilter(5))
im.putalpha(alpha)
return im.copy()
def get_font(self, font_name: str, size: int = 8):
if (font := self.fonts.get((font_name, size))):
if font := self.fonts.get((font_name, size)):
return font
if font_name == "default":
font = ImageFont.load_default(size)
@ -68,9 +87,20 @@ class AsyncBotManager:
self.fonts[font_name, size] = font
return font
def put_text(self, x: int, y: int, text: str, font: str, size: int = 8):
self.put_image(x, y, self.get_text_image(text, self.get_font(font, size)))
def put_text(
self,
x: int,
y: int,
text: str,
font: str,
size: int = 8,
negative: bool = False,
):
self.put_image(
x,
y,
self.get_text_image(text, self.get_font(font, size), negative),
)
def get_image_diff(self, ox: int, oy: int, im: Image.Image) -> PixelMap:
pixmap = PixelMap({})
@ -154,7 +184,9 @@ class AsyncBotManager:
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer))
self.canvas.paste(
Image.frombytes("1", (1000, 1000), buffer)
)
self._last_update = data["timestamp"]
print("Initial state received")
async with AsyncSimpleClient(http_session=http) as sio:
@ -172,7 +204,9 @@ class AsyncBotManager:
self.reader_event.set()
self.ready_event.set()
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
buffer = b64decode(
data["full_state"].encode() + b"="
)
image = Image.frombytes("1", (1000, 1000), buffer)
self.canvas.paste(image)
image.close()
@ -183,7 +217,9 @@ class AsyncBotManager:
print("SKIPPING UPDATES: TOO OLD")
else:
self._last_update = timestamp
self._read_boxes += len(bits_on) + len(bits_off)
self._read_boxes += len(bits_on) + len(
bits_off
)
for ndx in bits_on:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 255)
@ -194,10 +230,16 @@ class AsyncBotManager:
print("unknown event", event, data)
now = time_now()
if (now - self._last_printout) > 5:
outgoing = self._written_boxes / (now - self._last_printout)
incoming = self._read_boxes / (now - self._last_printout)
outgoing = self._written_boxes / (
now - self._last_printout
)
incoming = self._read_boxes / (
now - self._last_printout
)
print()
print(f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s")
print(
f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s"
)
print(f"Alive workers: {len(self._active)}")
if len(self._active) < 2 and self._n_printouts > 3:
@ -216,7 +258,9 @@ class AsyncBotManager:
else:
n_correct += 1
print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}")
print(
f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}"
)
self._written_boxes = 0
self._read_boxes = 0
self._active.clear()
@ -307,12 +351,27 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
for elem in settings["elements"]:
if elem["type"] == "text":
mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8))
mgr.put_text(
elem["x"],
elem["y"],
elem["text"],
elem.get("font", "default"),
elem.get("size", 8),
elem.get("negative", False),
)
print("ADD text", elem)
elif elem["type"] == "text_anim":
frames: list[Image.Image] = []
for text in elem["lines"]:
frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8))))
frames.append(
mgr.get_text_image(
text,
mgr.get_font(
elem.get("font", "default"),
elem.get("size", 8),
),
)
)
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
for frame in frames:
frame.close()
@ -325,7 +384,10 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
elif elem["type"] == "time":
time_format = elem["format"]
pos_x, pos_y = elem["x"], elem["y"]
font = mgr.get_font(elem.get("font", "default"), elem.get("size", 8))
font = mgr.get_font(
elem.get("font", "default"), elem.get("size", 8)
)
def update() -> PixelMap:
now = datetime.datetime.now(datetime.timezone.utc)
txt = now.strftime(time_format)
@ -333,6 +395,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
pixmap = mgr.get_image_diff(pos_x, pos_y, img)
img.close()
return pixmap
mgr.animation_functions.append(update)
elif elem["type"] == "tile":
with Image.open(elem["path"]).convert("LA") as im:
@ -349,7 +412,9 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
frames: list[Image.Image] = []
for frame in ImageSequence.Iterator(anim):
frames.append(frame.copy())
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
mgr.add_animation(
elem["x"], elem["y"], frames, elem["spf"]
)
for frame in frames:
frame.close()
print("ADD animation", elem)
@ -380,7 +445,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
# pack rgb888 to rgb565
rgb: int = (r >> 3) << 11
rgb |= (g >> 2) << 5
rgb |= (b >> 3)
rgb |= b >> 3
# write to a bitstream
for i in range(16):
@ -396,7 +461,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
offset = elem["offset"]
length = elem.get("length", 1000000)
written = 0
while (char := fp.read(1)):
while char := fp.read(1):
byte = char[0]
if written > length:
break
@ -424,7 +489,10 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
)
else:
res = await asyncio.gather(
*[mgr.writer(i, None, settings["delay"]) for i in range(settings["n_bots"])],
*[
mgr.writer(i, None, settings["delay"])
for i in range(settings["n_bots"])
],
)
for ret in res:
@ -436,4 +504,5 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
if __name__ == "__main__":
from sys import argv
asyncio.run(amain(*argv[1:]), debug=True)