142 lines
4.5 KiB
Python
142 lines
4.5 KiB
Python
import functools
|
|
import json
|
|
from multiprocessing.shared_memory import SharedMemory
|
|
import asyncio
|
|
import random
|
|
from typing import NamedTuple, Optional
|
|
from aiohttp_socks import ProxyConnector
|
|
import socketio
|
|
import aiohttp
|
|
import time
|
|
|
|
OFFSET_STATE = 0
|
|
OFFSET_AVOID = 125000
|
|
OFFSET_CANVAS = 250000
|
|
OFFSET_MASK = 375000
|
|
|
|
|
|
class PixelState(NamedTuple):
|
|
state: bool
|
|
avoid: bool
|
|
canvas: bool
|
|
mask: bool
|
|
|
|
|
|
class WorkerManager:
|
|
def __init__(self, shmem_name: str = "omcb-bot"):
|
|
self.shmem_name = shmem_name
|
|
self.base = "https://onemillioncheckboxes.com"
|
|
self.delay = 0.25
|
|
self.queue: asyncio.Queue[int] = asyncio.Queue(128)
|
|
|
|
self.n_toggles = 0
|
|
self.workers: set[tuple[int, int]] = set()
|
|
|
|
async def queue_manager(self):
|
|
offset = random.randint(0, 999999)
|
|
while True:
|
|
for oy in [0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15]:
|
|
for y in range(oy, 1000, 16):
|
|
for x in range(1000):
|
|
index = (x + y * 1000 + offset) % 1000000
|
|
byte, bit = divmod(index, 8)
|
|
mask = 0x80 >> bit
|
|
|
|
if self.shmem.buf[OFFSET_AVOID + byte] & mask:
|
|
continue
|
|
|
|
if (self.shmem.buf[OFFSET_MASK + byte] & mask) == 0:
|
|
continue
|
|
|
|
if (self.shmem.buf[OFFSET_CANVAS + byte] & mask) != (
|
|
self.shmem.buf[OFFSET_STATE + byte] & mask
|
|
):
|
|
await self.queue.put(index)
|
|
|
|
async def writer(self, bot_index: int, proxy: Optional[str] = None):
|
|
connector = ProxyConnector.from_url(proxy) if proxy else None
|
|
async with aiohttp.ClientSession(connector=connector) as http:
|
|
sio = socketio.AsyncClient(http_session=http)
|
|
|
|
async def writer_itself():
|
|
while not sio.connected:
|
|
await asyncio.sleep(0.1)
|
|
cookie = bot_index, random.randint(0, 9999)
|
|
self.workers.add(cookie)
|
|
try:
|
|
while sio.connected:
|
|
index = await self.queue.get()
|
|
byte, bit = divmod(index, 8)
|
|
mask = 0x80 >> bit
|
|
|
|
if self.shmem.buf[OFFSET_AVOID + byte] & mask:
|
|
continue
|
|
|
|
if (self.shmem.buf[OFFSET_MASK + byte] & mask) == 0:
|
|
continue
|
|
|
|
if (self.shmem.buf[OFFSET_CANVAS + byte] & mask) != (
|
|
self.shmem.buf[OFFSET_STATE + byte] & mask
|
|
):
|
|
byte, bit = divmod(index, 8)
|
|
self.n_toggles += 1
|
|
self.shmem.buf[OFFSET_STATE + byte] ^= 0x80 >> bit
|
|
await sio.emit("toggle_bit", {"index": index})
|
|
await asyncio.sleep(self.delay)
|
|
finally:
|
|
self.workers.remove(cookie)
|
|
|
|
sio.on("connect", writer_itself)
|
|
|
|
await sio.connect(self.base.replace("http", "ws"))
|
|
await sio.wait()
|
|
|
|
async def status_display(self):
|
|
last_printout = time.time()
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
diff = time.time() - last_printout
|
|
|
|
print()
|
|
print(f"Workers: {len(self.workers)}")
|
|
print(f"Queue size: {self.queue.qsize()}/{self.queue.maxsize}")
|
|
print(f"Toggles: {self.n_toggles / diff:.2f}/s")
|
|
|
|
self.n_toggles = 0
|
|
last_printout = time.time()
|
|
|
|
async def __aenter__(self):
|
|
self.shmem = SharedMemory(self.shmem_name)
|
|
return self
|
|
|
|
async def __aexit__(self, a, b, c):
|
|
self.shmem.close()
|
|
|
|
|
|
async def main(config_path: str = "worker.json", *_):
|
|
with open(config_path, "r") as fp:
|
|
config = json.load(fp)
|
|
|
|
n_bots = config.get("n_bots", 1)
|
|
|
|
async with WorkerManager(config.get("shmem", "omcb-bot")) as mgr:
|
|
|
|
mgr.delay = config.get("delay", mgr.delay)
|
|
|
|
workers = []
|
|
if proxies := config.get("proxy", []):
|
|
workers.extend([mgr.writer(i, proxies[i % len(proxies)]) for i in range(n_bots)])
|
|
else:
|
|
workers.extend([mgr.writer(i) for i in range(n_bots)])
|
|
|
|
await asyncio.gather(
|
|
mgr.queue_manager(),
|
|
mgr.status_display(),
|
|
*workers
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from sys import argv
|
|
asyncio.run(main(*argv[1:]))
|