onemillioncheckboxes/swarm/worker.py

161 lines
5.2 KiB
Python
Raw Normal View History

2024-07-09 18:09:55 +03:00
import json
2024-07-09 16:26:19 +03:00
from multiprocessing.shared_memory import SharedMemory
import asyncio
import random
from typing import NamedTuple, Optional
from aiohttp_socks import ProxyConnector
import socketio
import aiohttp
2024-07-09 17:09:12 +03:00
import time
2024-07-09 16:26:19 +03:00
OFFSET_STATE = 0
OFFSET_AVOID = 125000
OFFSET_CANVAS = 250000
OFFSET_MASK = 375000
class PixelState(NamedTuple):
state: bool
avoid: bool
canvas: bool
mask: bool
class WorkerManager:
def __init__(self, shmem_name: str = "omcb-bot"):
self.shmem_name = shmem_name
self.base = "https://onemillioncheckboxes.com"
self.delay = 0.25
2024-07-11 04:17:42 +03:00
self.batch_size = 7
2024-07-11 18:55:53 +03:00
self.queue: asyncio.Queue[int] = asyncio.Queue(64)
2024-07-09 17:09:12 +03:00
self.n_toggles = 0
self.workers: set[tuple[int, int]] = set()
2024-07-09 21:11:47 +03:00
self._restarts: dict[int, int] = {}
self.miss_avoid = 0
self.miss_mask = 0
self.miss_state = 0
2024-07-09 16:26:19 +03:00
async def queue_manager(self):
offset = random.randint(0, 1000000)
2024-07-11 04:17:42 +03:00
matrixX = [ 0, 2, 2, 0, 1, 3, 3, 1, 1, 3, 3, 1, 0, 2, 2, 0 ]
matrixY = [ 0, 2, 0, 2, 1, 3, 1, 3, 0, 2, 0, 2, 1, 3, 1, 3 ]
2024-07-09 16:26:19 +03:00
while True:
2024-07-11 04:17:42 +03:00
for ox, oy in zip(matrixX, matrixY):
2024-07-09 21:11:47 +03:00
for y in range(oy, 1000, 4):
2024-07-11 04:17:42 +03:00
for x in range(ox, 1000, 4):
2024-07-09 18:09:55 +03:00
index = (x + y * 1000 + offset) % 1000000
2024-07-09 21:11:47 +03:00
byte, bit = index >> 3, index & 7
2024-07-09 18:09:55 +03:00
mask = 0x80 >> bit
2024-07-09 17:09:12 +03:00
2024-07-09 18:09:55 +03:00
if self.shmem.buf[OFFSET_AVOID + byte] & mask:
continue
2024-07-09 17:09:12 +03:00
2024-07-09 18:09:55 +03:00
if (self.shmem.buf[OFFSET_MASK + byte] & mask) == 0:
continue
2024-07-09 17:09:12 +03:00
2024-07-11 22:11:02 +03:00
if 0 != (self.shmem.buf[OFFSET_CANVAS + byte] & mask):
2024-07-09 18:09:55 +03:00
await self.queue.put(index)
2024-07-11 05:21:04 +03:00
await asyncio.sleep(0.01)
2024-07-09 16:26:19 +03:00
async def writer(self, bot_index: int, proxy: Optional[str] = None):
connector = ProxyConnector.from_url(proxy) if proxy else None
async with aiohttp.ClientSession(connector=connector) as http:
2024-07-09 21:47:57 +03:00
sio = socketio.AsyncClient(http_session=http)
2024-07-09 21:11:47 +03:00
self._restarts[bot_index] = 0
2024-07-09 16:26:19 +03:00
async def writer_itself():
while not sio.connected:
await asyncio.sleep(0.1)
2024-07-09 21:11:47 +03:00
self._restarts[bot_index] += 1
cookie = bot_index, self._restarts[bot_index]
self.workers.add(cookie)
2024-07-11 04:17:42 +03:00
2024-07-11 18:55:53 +03:00
# await sio.emit("unsubscribe")
2024-07-11 05:44:21 +03:00
try:
2024-07-11 04:17:42 +03:00
batch = 0
while sio.connected:
index = await self.queue.get()
2024-07-09 21:11:47 +03:00
byte, bit = index >> 3, index & 7
mask = 0x80 >> bit
2024-07-09 17:09:12 +03:00
if (self.shmem.buf[OFFSET_CANVAS + byte] & mask) != (
self.shmem.buf[OFFSET_STATE + byte] & mask
):
await sio.emit("toggle_bit", {"index": index})
2024-07-11 19:07:30 +03:00
# self.shmem.buf[OFFSET_STATE + byte] ^= mask
2024-07-11 04:17:42 +03:00
self.n_toggles += 1
self.queue.task_done()
2024-07-11 04:17:42 +03:00
batch += 1
if batch >= self.batch_size:
await asyncio.sleep(self.delay)
batch = 0
else:
self.miss_state += 1
finally:
self.workers.remove(cookie)
2024-07-09 16:26:19 +03:00
sio.on("connect", writer_itself)
2024-07-11 05:47:01 +03:00
sio.on("unsubscribed", print)
2024-07-09 16:26:19 +03:00
await sio.connect(self.base.replace("http", "ws"))
await sio.wait()
2024-07-09 17:09:12 +03:00
async def status_display(self):
last_printout = time.time()
while True:
await asyncio.sleep(1)
diff = time.time() - last_printout
print()
2024-07-10 00:29:12 +03:00
print(f"Workers: {len(self.workers)} {self.workers}")
2024-07-09 17:09:12 +03:00
print(f"Queue size: {self.queue.qsize()}/{self.queue.maxsize}")
2024-07-11 05:21:04 +03:00
print(f"Toggles: {self.n_toggles / diff:.2f}/s EST: {self.batch_size * len(self.workers) / self.delay}")
print(f"Misses: A:{self.miss_avoid} M:{self.miss_mask} S:{self.miss_state}")
print(f"Q: {self.queue}")
2024-07-09 17:09:12 +03:00
self.n_toggles = 0
last_printout = time.time()
2024-07-09 16:26:19 +03:00
async def __aenter__(self):
self.shmem = SharedMemory(self.shmem_name)
return self
async def __aexit__(self, a, b, c):
2024-07-11 04:17:42 +03:00
pass
2024-07-09 16:26:19 +03:00
2024-07-09 18:09:55 +03:00
async def main(config_path: str = "worker.json", *_):
with open(config_path, "r") as fp:
config = json.load(fp)
n_bots = config.get("n_bots", 1)
async with WorkerManager(config.get("shmem", "omcb-bot")) as mgr:
mgr.delay = config.get("delay", mgr.delay)
2024-07-11 04:17:42 +03:00
mgr.batch_size = config.get("batch", mgr.batch_size)
2024-07-09 18:09:55 +03:00
workers = []
if proxies := config.get("proxy", []):
2024-07-09 21:11:47 +03:00
workers.extend(
[
mgr.writer(i, proxies[i % len(proxies)])
for i in range(n_bots)
]
)
2024-07-09 18:09:55 +03:00
else:
workers.extend([mgr.writer(i) for i in range(n_bots)])
2024-07-09 16:26:19 +03:00
await asyncio.gather(
2024-07-09 21:11:47 +03:00
mgr.queue_manager(), mgr.status_display(), *workers
2024-07-09 16:26:19 +03:00
)
if __name__ == "__main__":
2024-07-09 18:09:55 +03:00
from sys import argv
2024-07-09 21:11:47 +03:00
2024-07-09 18:09:55 +03:00
asyncio.run(main(*argv[1:]))