onemillioncheckboxes/async-bot.py

271 lines
11 KiB
Python
Raw Normal View History

2024-07-03 22:01:42 +03:00
import asyncio
2024-07-04 20:37:47 +03:00
from typing import NewType, Optional
2024-07-03 22:01:42 +03:00
from socketio import AsyncSimpleClient
from aiohttp import ClientSession
from aiohttp_socks import ProxyConnector
2024-07-04 20:37:47 +03:00
from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence
2024-07-03 22:01:42 +03:00
from base64 import b64decode
from random import choice
from json import load
2024-07-04 20:37:47 +03:00
from time import time as time_now
PixelMap = NewType("PixelMap", dict[int, bool])
Animation = NewType("Animation", tuple[list[PixelMap], float])
2024-07-03 22:01:42 +03:00
2024-07-04 16:16:59 +03:00
2024-07-03 22:01:42 +03:00
class AsyncBotManager:
def __init__(self, base: str = "https://onemillioncheckboxes.com"):
self.base = base
self.canvas = Image.new("1", (1000, 1000))
self.font = ImageFont.load_default(8)
2024-07-04 20:37:47 +03:00
self.difference: PixelMap = PixelMap({})
self._last_update = 0
2024-07-03 22:01:42 +03:00
self._shutdown: bool = False
self.avoid: set[int] = set()
2024-07-03 22:01:42 +03:00
2024-07-04 20:37:47 +03:00
self.animations: list[Animation] = []
2024-07-03 22:01:42 +03:00
def put_text(self, x: int, y: int, text: str):
with Image.new("LA", (int(self.font.getlength(text) + 12), 16)) as im:
draw = ImageDraw.Draw(im)
draw.rectangle((0, 0, im.width, im.height), (0, 0))
draw.text((6, 5), text, font=self.font, fill=(255, 0))
alpha = im.convert("L").filter(ImageFilter.MaxFilter(3))
im.putalpha(alpha)
self.put_image(x, y, im)
def put_image(self, ox: int, oy: int, im: Image.Image):
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
2024-07-04 16:37:14 +03:00
index = x + ox + (y + oy) * 1000
2024-07-03 22:01:42 +03:00
if a:
2024-07-04 16:37:14 +03:00
self.put_index(index, l > 0)
2024-07-04 17:17:23 +03:00
def put_pixel(self, x: int, y: int, val: bool):
self.put_index(x + y * 1000, val)
2024-07-04 16:37:14 +03:00
def put_index(self, index: int, value: bool):
2024-07-04 20:37:47 +03:00
if index not in self.avoid:
2024-07-04 16:37:14 +03:00
self.difference[index] = value
2024-07-03 22:01:42 +03:00
2024-07-04 20:37:47 +03:00
def add_animation(
self, ox: int, oy: int, image: Image.Image, spf: float = 1
) -> None:
animation = Animation(([], spf))
for frame in ImageSequence.Iterator(image):
frame_la = frame.convert("LA")
pixelmap = PixelMap({})
for y in range(frame_la.height):
for x in range(frame_la.width):
l, a = frame_la.getpixel((x, y)) # type: ignore
index = x + ox + (y + oy) * 1000
if a > 128 and index not in self.avoid:
pixelmap[index] = l > 128
animation[0].append(pixelmap)
self.animations.append(animation)
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None:
2024-07-04 16:33:05 +03:00
for y in range(sy, sy + h):
ox = y * 1000
self.add_avoid_range(sx + ox, sx + w + ox)
2024-07-04 20:37:47 +03:00
def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None:
self.avoid |= set(range(start, stop, step))
2024-07-04 20:37:47 +03:00
def add_avoid_index(self, *indices: int) -> None:
self.avoid |= set(indices)
2024-07-04 16:33:05 +03:00
def get_difference_image(self) -> Image.Image:
with Image.new("LA", (1000, 1000), 0) as im:
for index, expected in self.difference.items():
y, x = divmod(index, 1000)
im.putpixel((x, y), (255 if expected else 0, 255))
return im.copy()
def get_avoid_image(self) -> Image.Image:
with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im:
for index in self.avoid:
y, x = divmod(index, 1000)
im.putpixel((x, y), (255, 0, 0))
2024-07-04 16:33:05 +03:00
return im.copy()
2024-07-04 20:37:47 +03:00
async def listener(self) -> None:
2024-07-03 22:01:42 +03:00
async with ClientSession() as http:
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer))
2024-07-04 20:37:47 +03:00
self._last_update = data["timestamp"]
2024-07-03 22:01:42 +03:00
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
event, data = await sio.receive()
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
image = Image.frombytes("1", (1000, 1000), buffer)
self.canvas.paste(image)
image.close()
2024-07-04 20:37:47 +03:00
self._last_update = data["timestamp"]
2024-07-03 22:01:42 +03:00
elif event == "batched_bit_toggles":
bits_on, bits_off, timestamp = data
if timestamp < self._last_update:
print("SKIPPING UPDATES: TOO OLD")
2024-07-04 20:37:47 +03:00
self._last_update = timestamp
2024-07-03 22:01:42 +03:00
for ndx in bits_on:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 255)
for ndx in bits_off:
y, x = divmod(ndx, 1000)
self.canvas.putpixel((x, y), 0)
else:
print("unknown event", event, data)
2024-07-04 20:37:47 +03:00
for pixmaps, spf in self.animations:
frame_index = int(time_now() / spf)
self.difference.update(
pixmaps[frame_index % len(pixmaps)]
)
2024-07-03 22:01:42 +03:00
async def writer(
self,
bot_index: int,
proxy_url: Optional[str] = None,
delay: float = 0.25,
):
proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None
async with ClientSession(connector=proxy) as http:
2024-07-03 22:01:42 +03:00
async with AsyncSimpleClient(http_session=http) as sio:
await sio.connect(f"{self.base}/socket.io")
while not self._shutdown:
diff = list(self.difference.items())
for _ in range(100):
index, expected = choice(diff)
y, x = divmod(index, 1000)
current = self.canvas.getpixel((x, y)) > 0 # type: ignore
if current != expected:
print(f"[{bot_index:2d}] swap {x:3d} {y:3d}")
await sio.emit("toggle_bit", {"index": index})
await asyncio.sleep(delay)
break
await asyncio.sleep(0.1)
2024-07-03 22:01:42 +03:00
async def __aenter__(self):
self._listener_task = asyncio.create_task(self.listener())
return self
async def __aexit__(self, a, b, c):
self._shutdown = True
await self._listener_task
2024-07-04 20:37:47 +03:00
async def amain() -> None:
2024-07-03 22:01:42 +03:00
with open("settings.json", "r") as fp:
settings = load(fp)
async with AsyncBotManager() as mgr:
mgr.font = ImageFont.truetype(settings["font"], 8)
2024-07-04 16:33:05 +03:00
for avoid in settings["avoid"]:
if avoid["type"] == "rect":
2024-07-04 16:37:14 +03:00
mgr.add_avoid_rect(
avoid["x"], avoid["y"], avoid["w"], avoid["h"]
)
2024-07-04 16:33:05 +03:00
elif avoid["type"] == "range":
2024-07-04 16:37:14 +03:00
mgr.add_avoid_range(
2024-07-04 16:53:34 +03:00
avoid["start"], avoid["stop"], avoid.get("step", 1)
2024-07-04 16:37:14 +03:00
)
elif avoid["type"] == "image":
with Image.open(avoid["path"]).convert("LA") as im:
assert im.width == 1000 and im.height == 1000
for y in range(im.height):
for x in range(im.width):
l, a = im.getpixel((x, y)) # type: ignore
if a > 128:
mgr.add_avoid_index(x + y * 1000)
2024-07-03 22:01:42 +03:00
for elem in settings["elements"]:
if elem["type"] == "text":
mgr.put_text(elem["x"], elem["y"], elem["text"])
2024-07-03 22:01:42 +03:00
elif elem["type"] == "image":
with Image.open(elem["path"]).convert("LA") as im:
mgr.put_image(elem["x"], elem["y"], im)
2024-07-04 20:37:47 +03:00
elif elem["type"] == "animation":
with Image.open(elem["path"]) as anim:
mgr.add_animation(elem["x"], elem["y"], anim, elem["spf"])
2024-07-03 22:01:42 +03:00
elif elem["type"] == "rgb111":
ox, oy = elem["x"], elem["y"]
2024-07-04 12:28:52 +03:00
with Image.open(elem["path"]).convert("RGBA") as im:
2024-07-03 22:01:42 +03:00
for y in range(im.height):
for x in range(im.width):
2024-07-04 12:28:12 +03:00
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
2024-07-03 22:01:42 +03:00
2024-07-04 16:16:59 +03:00
ndx_start = (x + ox + (y + oy) * 577) * 3
2024-07-04 16:37:14 +03:00
mgr.put_index(ndx_start, r > 128)
mgr.put_index(ndx_start + 1, g > 128)
mgr.put_index(ndx_start + 2, b > 128)
elif elem["type"] == "rgb565":
ox, oy = elem["x"], elem["y"]
2024-07-04 12:28:12 +03:00
with Image.open(elem["path"]).convert("RGBA") as im:
for y in range(im.height):
for x in range(im.width):
2024-07-04 12:28:12 +03:00
r, g, b, a = im.getpixel((x, y)) # type: ignore
if a < 128:
continue
2024-07-04 20:37:47 +03:00
ndx_start = (x + ox + (y + oy) * 250) * 16
color: int = (r >> 3) << 13
color |= (g >> 2) << 5
2024-07-04 16:16:59 +03:00
color |= b >> 3
for i in range(16):
2024-07-04 16:37:14 +03:00
mgr.put_index(
ndx_start + i, ((color >> i) & 1) > 0
)
2024-07-04 17:17:23 +03:00
elif elem["type"] == "rect":
for y in range(elem["y"], elem["y"] + elem["h"]):
for x in range(elem["x"], elem["x"] + elem["w"]):
mgr.put_pixel(x, y, elem["fill"])
2024-07-04 20:37:47 +03:00
elif elem["type"] == "blob":
with open(elem["path"], "rb") as fp:
offset = elem["offset"]
length = elem.get("length", 1000000)
written = 0
while (char := fp.read(1)):
byte = char[0]
if written > length:
break
for i in range(8):
if written > length:
break
mgr.put_index(offset, bool((byte >> (7 - i)) & 1))
written += 1
offset += 1
2024-07-04 16:16:59 +03:00
2024-07-04 16:33:05 +03:00
mgr.get_difference_image().save("result.png")
mgr.get_avoid_image().save("avoid.png")
2024-07-04 16:16:59 +03:00
if n_proxies := len(settings["proxies"]):
await asyncio.gather(
*[
mgr.writer(
i,
settings["proxies"][i % n_proxies],
settings["delay"],
)
2024-07-04 16:16:59 +03:00
for i in range(settings["n_bots"])
],
return_exceptions=True,
)
2024-07-03 22:49:06 +03:00
else:
2024-07-04 16:16:59 +03:00
await asyncio.gather(
*[mgr.writer(i) for i in range(settings["n_bots"])],
return_exceptions=True,
)
2024-07-03 22:01:42 +03:00
if __name__ == "__main__":
asyncio.run(amain())