onemillioncheckboxes/async-bot.py

130 lines
5.2 KiB
Python
Raw Normal View History

2024-07-03 22:01:42 +03:00
import asyncio
from socketio import AsyncSimpleClient
from aiohttp import ClientSession
from PIL import Image, ImageFont, ImageDraw, ImageFilter
from base64 import b64decode
from random import choice
from json import load
class AsyncBotManager:
def __init__(self, base: str = "https://onemillioncheckboxes.com"):
self.base = base
self.canvas = Image.new("1", (1000, 1000))
self.font = ImageFont.load_default(8)
self.difference: dict[int, bool] = {}
self._shutdown: bool = False
def put_text(self, x: int, y: int, text: str):
with Image.new("LA", (int(self.font.getlength(text) + 12), 16)) as im:
draw = ImageDraw.Draw(im)
draw.rectangle((0, 0, im.width, im.height), (0, 0))
draw.text((6, 5), text, font=self.font, fill=(255, 0))
alpha = im.convert("L").filter(ImageFilter.MaxFilter(3))
im.putalpha(alpha)
self.put_image(x, y, im)
def put_image(self, ox: int, oy: int, im: Image.Image):
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
if a:
self.difference[x + ox + (y + oy) * 1000] = l > 0
async def listener(self):
async with ClientSession() as http:
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer))
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
event, data = await sio.receive()
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
image = Image.frombytes("1", (1000, 1000), buffer)
self.canvas.paste(image)
image.close()
elif event == "batched_bit_toggles":
bits_on, bits_off, timestamp = data
for ndx in bits_on:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 255)
for ndx in bits_off:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 0)
else:
print("unknown event", event, data)
async def writer(self):
async with ClientSession() as http:
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
try:
event, data = await sio.receive(0.1)
if event not in ("full_state", "batched_bit_toggles"):
print("!!!", event, data)
except Exception:
pass
index, target = choice(list(self.difference.items()))
y, x = divmod(index, 1000)
curr = self.canvas.getpixel((x, y)) > 0 # type: ignore
if curr != target:
print("swap", x, y, index)
await sio.emit("toggle_bit", {
"index": index
})
await asyncio.sleep(0.25)
else:
await asyncio.sleep(0.001)
async def __aenter__(self):
self._listener_task = asyncio.create_task(self.listener())
return self
async def __aexit__(self, a, b, c):
self._shutdown = True
await self._listener_task
async def amain():
with open("settings.json", "r") as fp:
settings = load(fp)
async with AsyncBotManager() as mgr:
mgr.font = ImageFont.truetype(settings["font"], 8)
for elem in settings["elements"]:
if elem["type"] == "text":
mgr.put_text(elem["text"], elem["x"], elem["y"])
elif elem["type"] == "image":
with Image.open(elem["path"]).convert("LA") as im:
mgr.put_image(elem["x"], elem["y"], im)
elif elem["type"] == "rgb111":
ox, oy = elem["x"], elem["y"]
with Image.open(elem["path"]).convert("RGB") as im:
for y in range(im.height):
for x in range(im.width):
r, g, b = im.getpixel((x, y)) # type: ignore
pocket_x = x + ox
pocket_y = y + oy
ndx_start = pocket_x * 3 + pocket_y * 577 * 3
mgr.difference[ndx_start] = r > 128
mgr.difference[ndx_start + 1] = g > 128
mgr.difference[ndx_start + 2] = b > 128
await asyncio.gather(*[
mgr.writer() for _ in range(settings["n_bots"])
], return_exceptions=True)
if __name__ == "__main__":
asyncio.run(amain())