onemillioncheckboxes/async-bot.py

357 lines
15 KiB
Python

import asyncio
from typing import NewType, Optional
from socketio import AsyncClient, AsyncSimpleClient
from aiohttp import ClientSession
from aiohttp_socks import ProxyConnector
from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops
from base64 import b64decode
from random import choice, randint
from json import load
from time import time as time_now
PixelMap = NewType("PixelMap", dict[int, bool])
Animation = NewType("Animation", tuple[list[PixelMap], float])
Font = ImageFont.FreeTypeFont | ImageFont.ImageFont
class AsyncBotManager:
def __init__(self, base: str = "https://onemillioncheckboxes.com"):
self.base = base
self.canvas = Image.new("1", (1000, 1000))
self.fonts: dict[tuple[str, int], Font] = {
("default", 8): ImageFont.load_default(8)
}
self.difference: PixelMap = PixelMap({})
self._last_update = 0
self._shutdown: bool = False
self.avoid: set[int] = set()
self.animations: list[Animation] = []
self._written_boxes = 0
self._read_boxes = 0
self._last_printout = time_now()
self._active: set[int] = set()
@staticmethod
def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image:
left, top, right, bottom = font.getbbox(text)
with Image.new("LA", (int(right - left) + 4, int(bottom - top) + 4), 0) as im:
draw = ImageDraw.Draw(im)
draw.rectangle((0, 0, im.width, im.height), (0, 0))
draw.text((left + 2, top + 2), text, font=font, fill=(255, 0))
alpha = im.convert("L").filter(ImageFilter.MaxFilter(5))
im.putalpha(alpha)
return im.copy()
def get_font(self, font_name: str, size: int = 8):
if (font := self.fonts.get((font_name, size))):
return font
if font_name == "default":
font = ImageFont.load_default(size)
else:
font = ImageFont.truetype(font_name, size)
self.fonts[font_name, size] = font
return font
def put_text(self, x: int, y: int, text: str, font: str, size: int = 8):
self.put_image(x, y, self.get_text_image(text, self.get_font(font, size)))
def put_image(self, ox: int, oy: int, im: Image.Image):
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if a:
self.put_bit(index, l > 0)
def put_pixel(self, x: int, y: int, val: bool):
self.put_bit(x + y * 1000, val)
def put_bit(self, index: int, value: bool):
if index not in self.avoid:
self.difference[index] = value
def add_animation(
self, ox: int, oy: int, frames: list[Image.Image], spf: float = 1
) -> None:
animation = Animation(([], spf))
alpha = Image.new("L", frames[0].size, 0)
for frame in frames:
frame_la = frame.convert("LA")
frame_alpha = frame_la.getchannel("A")
alpha = ImageChops.add(alpha, frame_alpha)
for frame in frames:
frame_la = frame.convert("LA")
pixelmap = PixelMap({})
for y in range(frame_la.height):
for x in range(frame_la.width):
l, a = frame_la.getpixel((x, y)) # type: ignore
ga: int = alpha.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if (a > 128 or ga > 128) and index not in self.avoid:
pixelmap[index] = l > 128 if a > 128 else True
animation[0].append(pixelmap)
self.animations.append(animation)
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None:
for y in range(sy, sy + h):
ox = y * 1000
self.add_avoid_range(sx + ox, sx + w + ox)
def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None:
self.avoid |= set(range(start, stop, step))
def add_avoid_index(self, *indices: int) -> None:
self.avoid |= set(indices)
def get_difference_image(self) -> Image.Image:
with Image.new("LA", (1000, 1000), 0) as im:
for index, expected in self.difference.items():
y, x = divmod(index, 1000)
im.putpixel((x, y), (255 if expected else 0, 255))
return im.copy()
def get_avoid_image(self) -> Image.Image:
with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im:
for index in self.avoid:
y, x = divmod(index, 1000)
im.putpixel((x, y), (255, 0, 0))
return im.copy()
async def listener(self) -> None:
try:
async with ClientSession() as http:
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer))
self._last_update = data["timestamp"]
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
event, data = await sio.receive()
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
image = Image.frombytes("1", (1000, 1000), buffer)
self.canvas.paste(image)
image.close()
self._last_update = data["timestamp"]
elif event == "batched_bit_toggles":
bits_on, bits_off, timestamp = data
if timestamp < self._last_update:
print("SKIPPING UPDATES: TOO OLD")
else:
self._last_update = timestamp
self._read_boxes += len(bits_on) + len(bits_off)
for ndx in bits_on:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 255)
for ndx in bits_off:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 0)
else:
print("unknown event", event, data)
now = time_now()
if (now - self._last_printout) > 10:
outgoing = self._written_boxes / (now - self._last_printout)
incoming = self._read_boxes / (now - self._last_printout)
print()
print(f"Incoming: {incoming:7.2f}/s Outgoing: {outgoing:7.2f}/s")
print(f"Alive workers: {len(self._active)}")
n_correct, n_wrong = 0, 0
for index, expected in self.difference.items():
y, x = divmod(index, 1000)
actual = self.canvas.getpixel((x, y)) > 0 # type: ignore
if expected != actual:
n_wrong += 1
else:
n_correct += 1
print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}")
self._written_boxes = 0
self._read_boxes = 0
self._active.clear()
self._last_printout = now
for pixmaps, spf in self.animations:
frame_index = int(now / spf)
self.difference.update(
pixmaps[frame_index % len(pixmaps)]
)
except Exception as e:
print(f"Listener died: {e!r}")
self._shutdown = True
raise
async def writer(
self,
bot_index: int,
proxy_url: Optional[str] = None,
delay: float = 0.25,
):
proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None
async with ClientSession(connector=proxy) as http:
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
offset = randint(0, 1000000)
while not self._shutdown:
diff = list(self.difference.items())
for _ in range(100):
index, expected = diff[offset % len(diff)]
offset += randint(0, 100)
y, x = divmod(index, 1000)
current = self.canvas.getpixel((x, y)) > 0 # type: ignore
if current != expected:
# print(f"[{bot_index:2d}] swap {x:3d} {y:3d}")
self._written_boxes += 1
await sio.emit("toggle_bit", {"index": index})
await asyncio.sleep(delay)
break
self._active.add(bot_index)
await asyncio.sleep(0.01)
async def __aenter__(self):
self._listener_task = asyncio.create_task(self.listener())
return self
async def __aexit__(self, a, b, c):
self._shutdown = True
await self._listener_task
async def amain() -> None:
with open("settings.json", "r") as fp:
settings = load(fp)
async with AsyncBotManager() as mgr:
for avoid in settings["avoid"]:
if avoid["type"] == "rect":
mgr.add_avoid_rect(
avoid["x"], avoid["y"], avoid["w"], avoid["h"]
)
print("AVOID rectangle {w}x{h}+{x}+{y}".format(**avoid))
elif avoid["type"] == "range":
mgr.add_avoid_range(
avoid["start"], avoid["stop"], avoid.get("step", 1)
)
print("AVOID range {start}-{stop}".format(**avoid))
elif avoid["type"] == "image":
with Image.open(avoid["path"]).convert("LA") as im:
assert im.width == 1000 and im.height == 1000
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
if a > 128:
mgr.add_avoid_index(x + y * 1000)
print("AVOID image", avoid["path"])
for elem in settings["elements"]:
if elem["type"] == "text":
mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8))
print("ADD text", elem["x"], elem["y"], elem["text"])
elif elem["type"] == "text_anim":
frames: list[Image.Image] = []
for text in elem["lines"]:
frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8))))
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
for frame in frames:
frame.close()
elif elem["type"] == "image":
with Image.open(elem["path"]).convert("LA") as im:
mgr.put_image(elem["x"], elem["y"], im)
elif elem["type"] == "animation":
with Image.open(elem["path"]) as anim:
frames: list[Image.Image] = []
for frame in ImageSequence.Iterator(anim):
frames.append(frame.copy())
mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"])
for frame in frames:
frame.close()
elif elem["type"] == "rgb111":
ox, oy = elem["x"], elem["y"]
with Image.open(elem["path"]).convert("RGBA") as im:
for y in range(im.height):
for x in range(im.width):
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
start_ndx = (x + ox + (y + oy) * 577) * 3
mgr.put_bit(start_ndx, r > 128)
mgr.put_bit(start_ndx + 1, g > 128)
mgr.put_bit(start_ndx + 2, b > 128)
elif elem["type"] == "rgb565":
ox, oy = elem["x"], elem["y"]
with Image.open(elem["path"]).convert("RGBA") as im:
for y in range(im.height):
for x in range(im.width):
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
offset: int = (x + ox + (y + oy) * 250) * 16
# pack rgb888 to rgb565
rgb: int = (r >> 3) << 11
rgb |= (g >> 2) << 5
rgb |= (b >> 3)
# write to a bitstream
for i in range(16):
mgr.put_bit(
offset + i, ((rgb << i) & 0x8000) > 0
)
elif elem["type"] == "rect":
for y in range(elem["y"], elem["y"] + elem["h"]):
for x in range(elem["x"], elem["x"] + elem["w"]):
mgr.put_pixel(x, y, elem["fill"])
elif elem["type"] == "blob":
with open(elem["path"], "rb") as fp:
offset = elem["offset"]
length = elem.get("length", 1000000)
written = 0
while (char := fp.read(1)):
byte = char[0]
if written > length:
break
for i in range(8):
if written > length:
break
mgr.put_bit(offset, bool((byte >> (7 - i)) & 1))
written += 1
offset += 1
mgr.get_difference_image().save("result.png")
mgr.get_avoid_image().save("avoid.png")
print("Starting writers...")
if n_proxies := len(settings["proxies"]):
res = await asyncio.gather(
*[
mgr.writer(
i,
settings["proxies"][i % n_proxies],
settings["delay"],
)
for i in range(settings["n_bots"])
],
return_exceptions=True,
)
else:
res = await asyncio.gather(
*[mgr.writer(i) for i in range(settings["n_bots"])],
return_exceptions=True,
)
for ret in res:
print(ret)
if __name__ == "__main__":
asyncio.run(amain())