onemillioncheckboxes/async-bot.py

440 lines
19 KiB
Python

import asyncio
from typing import Callable, NewType, Optional
from socketio import AsyncClient, AsyncSimpleClient
import socketio
from aiohttp import ClientSession, ClientTimeout
from aiohttp_socks import ProxyConnector
from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops
from base64 import b64decode
from random import choice, randint
from json import load
from time import time as time_now
import datetime
from contextlib import suppress
PixelMap = NewType("PixelMap", dict[int, bool])
Animation = NewType("Animation", tuple[list[PixelMap], float])
Font = ImageFont.FreeTypeFont | ImageFont.ImageFont
TIMEOUT = ClientTimeout(120)
class AsyncBotManager:
def __init__(self, base: str = "https://onemillioncheckboxes.com"):
self.base = base
self.canvas = Image.new("1", (1000, 1000))
self.fonts: dict[tuple[str, int], Font] = {
("default", 8): ImageFont.load_default(8)
}
self.difference: PixelMap = PixelMap({})
self._last_update = 0
self._shutdown: bool = False
self.avoid: set[int] = set()
self.animations: list[Animation] = []
self.animation_functions: list[Callable[[], PixelMap]] = []
self._written_boxes = 0
self._read_boxes = 0
self._last_printout = time_now()
self._n_printouts = 0
self._active: set[int] = set()
self.reader_event = asyncio.Event()
self.ready_event = asyncio.Event()
@staticmethod
def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image:
left, top, right, bottom = font.getbbox(text)
with Image.new("LA", (int(right - left) + 4, int(bottom - top) + 16), 0) as im:
draw = ImageDraw.Draw(im)
draw.rectangle((0, 0, im.width, im.height), (0, 0))
draw.text((left + 2, top + 2), text, font=font, fill=(255, 0),
anchor="lt")
alpha = im.convert("L").filter(ImageFilter.MaxFilter(5))
im.putalpha(alpha)
return im.copy()
def get_font(self, font_name: str, size: int = 8):
if (font := self.fonts.get((font_name, size))):
return font
if font_name == "default":
font = ImageFont.load_default(size)
else:
font = ImageFont.truetype(font_name, size)
self.fonts[font_name, size] = font
return font
def put_text(self, x: int, y: int, text: str, font: str, size: int = 8):
self.put_image(x, y, self.get_text_image(text, self.get_font(font, size)))
def get_image_diff(self, ox: int, oy: int, im: Image.Image) -> PixelMap:
pixmap = PixelMap({})
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if a and index not in self.avoid:
pixmap[index] = l > 0
return pixmap
def put_image(self, ox: int, oy: int, im: Image.Image):
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if a:
self.put_bit(index, l > 0)
def put_pixel(self, x: int, y: int, val: bool):
if x > 1000 or y > 1000:
return
self.put_bit(x + y * 1000, val)
def put_bit(self, index: int, value: bool):
if index not in self.avoid and index < 1000000:
self.difference[index] = value
def add_animation(
self, ox: int, oy: int, frames: list[Image.Image], spf: float = 1
) -> None:
animation = Animation(([], spf))
alpha = Image.new("L", frames[0].size, 0)
for frame in frames:
frame_la = frame.convert("LA")
frame_alpha = frame_la.getchannel("A")
alpha = ImageChops.add(alpha, frame_alpha)
for frame in frames:
frame_la = frame.convert("LA")
pixelmap = PixelMap({})
for y in range(frame_la.height):
for x in range(frame_la.width):
l, a = frame_la.getpixel((x, y)) # type: ignore
ga: int = alpha.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if (a > 128 or ga > 128) and index not in self.avoid:
pixelmap[index] = l > 128 if a > 128 else True
animation[0].append(pixelmap)
self.animations.append(animation)
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None:
for y in range(sy, sy + h):
ox = y * 1000
self.add_avoid_range(sx + ox, sx + w + ox)
def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None:
self.avoid |= set(range(start, stop, step))
def add_avoid_index(self, *indices: int) -> None:
self.avoid |= set(indices)
def get_difference_image(self) -> Image.Image:
with Image.new("LA", (1000, 1000), 0) as im:
for index, expected in self.difference.items():
y, x = divmod(index, 1000)
im.putpixel((x, y), (255 if expected else 0, 255))
return im.copy()
def get_avoid_image(self) -> Image.Image:
with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im:
for index in self.avoid:
y, x = divmod(index, 1000)
im.putpixel((x, y), (255, 0, 0))
return im.copy()
async def listener(self) -> None:
try:
async with ClientSession(timeout=TIMEOUT) as http:
print("Getting initial state")
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer))
self._last_update = data["timestamp"]
print("Initial state received")
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
try:
async with asyncio.timeout(20):
event, data = await sio.receive()
except TimeoutError:
print("Reading failed")
if not sio.connected:
print("Reconnecting")
await sio.connect(f"{self.base}/socket.io")
continue
self.reader_event.set()
self.ready_event.set()
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
image = Image.frombytes("1", (1000, 1000), buffer)
self.canvas.paste(image)
image.close()
self._last_update = data["timestamp"]
elif event == "batched_bit_toggles":
bits_on, bits_off, timestamp = data
if timestamp < self._last_update:
print("SKIPPING UPDATES: TOO OLD")
else:
self._last_update = timestamp
self._read_boxes += len(bits_on) + len(bits_off)
for ndx in bits_on:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 255)
for ndx in bits_off:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 0)
else:
print("unknown event", event, data)
now = time_now()
if (now - self._last_printout) > 5:
outgoing = self._written_boxes / (now - self._last_printout)
incoming = self._read_boxes / (now - self._last_printout)
print()
print(f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s")
print(f"Alive workers: {len(self._active)}")
if len(self._active) < 2 and self._n_printouts > 3:
print("Too few workers, dying!")
self._shutdown = True
return
self._n_printouts += 1
n_correct, n_wrong = 0, 0
for index, expected in self.difference.items():
y, x = divmod(index, 1000)
actual = self.canvas.getpixel((x, y)) > 0 # type: ignore
if expected != actual:
n_wrong += 1
else:
n_correct += 1
print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}")
self._written_boxes = 0
self._read_boxes = 0
self._active.clear()
self._last_printout = now
for pixmaps, spf in self.animations:
frame_index = int(now / spf)
self.difference.update(
pixmaps[frame_index % len(pixmaps)]
)
for func in self.animation_functions:
self.difference.update(func())
except Exception as e:
print(f"Listener died: {e!r}")
self._shutdown = True
raise
async def writer(
self,
bot_index: int,
proxy_url: Optional[str] = None,
delay: float = 0.25,
):
proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None
try:
async with ClientSession(connector=proxy, timeout=TIMEOUT) as http:
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
print(f"Writer {bot_index} connected, waiting...")
await self.ready_event.wait()
print(f"Writer {bot_index} running")
offset = 0
while not self._shutdown:
diff = list(self.difference.items())
diff = sorted(diff, key=lambda kv: kv[0])
for _ in range(100):
index, expected = diff[offset % len(diff)]
offset += randint(0, 1000)
y, x = divmod(index, 1000)
current = self.canvas.getpixel((x, y)) > 0 # type: ignore
if current != expected:
self._written_boxes += 1
await sio.emit("toggle_bit", {"index": index})
await asyncio.sleep(delay)
self._active.add(bot_index)
break
with suppress(BaseException):
await sio.receive(0.1)
print(f"Worker {bot_index} stopped: shutdown")
except Exception as e:
print(f"Worker {bot_index} died: {e!r}")
async def __aenter__(self):
self._listener_task = asyncio.create_task(self.listener())
return self
async def __aexit__(self, a, b, c):
self._shutdown = True
await self._listener_task
async def amain(conf_path: str = "settings.json", *_) -> None:
with open(conf_path, "r") as fp:
settings = load(fp)
async with AsyncBotManager() as mgr:
for avoid in settings["avoid"]:
if avoid["type"] == "rect":
mgr.add_avoid_rect(
avoid["x"], avoid["y"], avoid["w"], avoid["h"]
)
print("AVOID rectangle {w}x{h}+{x}+{y}".format(**avoid))
elif avoid["type"] == "range":
mgr.add_avoid_range(
avoid["start"], avoid["stop"], avoid.get("step", 1)
)
print("AVOID range {start}-{stop}".format(**avoid))
elif avoid["type"] == "image":
with Image.open(avoid["path"]).convert("LA") as im:
assert im.width == 1000 and im.height == 1000
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
if a > 128:
mgr.add_avoid_index(x + y * 1000)
print("AVOID image", avoid["path"])
for elem in settings["elements"]:
if elem["type"] == "text":
mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8))
print("ADD text", elem)
elif elem["type"] == "text_anim":
frames: list[Image.Image] = []
for text in elem["lines"]:
frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8))))
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
for frame in frames:
frame.close()
print("ADD text animation", elem)
elif elem["type"] == "image":
with Image.open(elem["path"]).convert("LA") as im:
mgr.put_image(elem["x"], elem["y"], im)
print("ADD image", elem)
elif elem["type"] == "time":
time_format = elem["format"]
pos_x, pos_y = elem["x"], elem["y"]
font = mgr.get_font(elem.get("font", "default"), elem.get("size", 8))
def update() -> PixelMap:
now = datetime.datetime.now(datetime.timezone.utc)
txt = now.strftime(time_format)
img = mgr.get_text_image(txt, font)
pixmap = mgr.get_image_diff(pos_x, pos_y, img)
img.close()
return pixmap
mgr.animation_functions.append(update)
elif elem["type"] == "tile":
with Image.open(elem["path"]).convert("LA") as im:
for oy in range(elem.get("h", im.height)):
for ox in range(elem.get("w", im.width)):
l, a = im.getpixel((ox % im.width, oy % im.height)) # type: ignore
if a:
x, y = elem["x"] + ox, elem["y"] + oy
index = x + y * 1000
mgr.put_bit(index, l > 0)
print("ADD tile", elem)
elif elem["type"] == "animation":
with Image.open(elem["path"]) as anim:
frames: list[Image.Image] = []
for frame in ImageSequence.Iterator(anim):
frames.append(frame.copy())
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
for frame in frames:
frame.close()
print("ADD animation", elem)
elif elem["type"] == "rgb111":
ox, oy = elem["x"], elem["y"]
with Image.open(elem["path"]).convert("RGBA") as im:
for y in range(im.height):
for x in range(im.width):
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
start_ndx = (x + ox + (y + oy) * 577) * 3
mgr.put_bit(start_ndx, r > 128)
mgr.put_bit(start_ndx + 1, g > 128)
mgr.put_bit(start_ndx + 2, b > 128)
elif elem["type"] == "rgb565":
ox, oy = elem["x"], elem["y"]
with Image.open(elem["path"]).convert("RGBA") as im:
for y in range(im.height):
for x in range(im.width):
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
offset: int = (x + ox + (y + oy) * 250) * 16
# pack rgb888 to rgb565
rgb: int = (r >> 3) << 11
rgb |= (g >> 2) << 5
rgb |= (b >> 3)
# write to a bitstream
for i in range(16):
mgr.put_bit(
offset + i, ((rgb << i) & 0x8000) > 0
)
elif elem["type"] == "rect":
for y in range(elem["y"], elem["y"] + elem["h"]):
for x in range(elem["x"], elem["x"] + elem["w"]):
mgr.put_pixel(x, y, elem["fill"])
elif elem["type"] == "blob":
with open(elem["path"], "rb") as fp:
offset = elem["offset"]
length = elem.get("length", 1000000)
written = 0
while (char := fp.read(1)):
byte = char[0]
if written > length:
break
for i in range(8):
if written > length:
break
mgr.put_bit(offset, bool((byte >> (7 - i)) & 1))
written += 1
offset += 1
mgr.get_difference_image().save("result.png")
mgr.get_avoid_image().save("avoid.png")
print("Starting writers...")
if n_proxies := len(settings["proxies"]):
res = await asyncio.gather(
*[
mgr.writer(
i,
settings["proxies"][i % n_proxies],
settings["delay"],
)
for i in range(settings["n_bots"])
],
)
else:
res = await asyncio.gather(
*[mgr.writer(i, None, settings["delay"]) for i in range(settings["n_bots"])],
)
for ret in res:
print("RETURN", repr(ret))
print("Shutting down...")
mgr._shutdown = True
if __name__ == "__main__":
from sys import argv
asyncio.run(amain(*argv[1:]), debug=True)