133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
from base64 import b64decode
|
|
import tkinter as tk
|
|
from PIL import Image, ImageTk
|
|
from socketio import SimpleClient, exceptions
|
|
from threading import Thread
|
|
from requests import get
|
|
from datetime import datetime
|
|
|
|
COLORS = [(0x33, 0x33, 0x66), (0x96, 0x96, 0xe0)]
|
|
COLORS_UNPACKED = COLORS[0] + ((0, 0, 0) * 254) + COLORS[1]
|
|
|
|
SCREENSHOTS_TMPL = "./screenshots/%Y%m%d_%H%M%S.webp"
|
|
|
|
class App(tk.Tk):
|
|
def __init__(self, url: str = "https://onemillioncheckboxes.com") -> None:
|
|
self.url = url
|
|
super().__init__()
|
|
self.title("1M pixels")
|
|
|
|
self.sio = SimpleClient()
|
|
|
|
self._canvas = tk.Canvas(self)
|
|
self._canvas.config(width=1000, height=1000, borderwidth=0, highlightthickness=0)
|
|
self._canvas.pack()
|
|
self.config(width=1000, height=1000, borderwidth=0, highlightthickness=0)
|
|
|
|
self._last_update = 0
|
|
|
|
self._running = False
|
|
|
|
self._image = Image.new("RGB", (1000, 1000), COLORS[0])
|
|
|
|
self._canvas.bind("<Motion>", self._on_mouse_move)
|
|
self._canvas.bind("<Button>", self._on_mouse_click)
|
|
self.bind("<KeyPress>", self._on_key_down)
|
|
|
|
|
|
def _on_mouse_move(self, event: tk.Event):
|
|
self.title(f"1M pixels: {event.x}, {event.y}")
|
|
|
|
def _on_mouse_click(self, event: tk.Event):
|
|
x, y = event.x, event.y
|
|
self.sio.emit("toggle_bit", { "index": x + y * 1000 })
|
|
|
|
def _save_image(self):
|
|
ts = datetime.fromtimestamp(self._last_update / 1000)
|
|
path = ts.strftime(SCREENSHOTS_TMPL)
|
|
print("SAVED", path)
|
|
self._image.save(path)
|
|
|
|
self.after(10000, self._save_image)
|
|
|
|
def _on_key_down(self, event: tk.Event):
|
|
# <KeyPress event state=Control keysym=r keycode=27 char='\x12' x=538 y=556>
|
|
if event.keysym == "r":
|
|
print("FULL REFRESH")
|
|
with get(f"{self.url}/api/initial-state") as req:
|
|
data = req.json()
|
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
|
img = Image.frombytes("1", (1000, 1000), buffer).convert("P")
|
|
img.putpalette(COLORS_UNPACKED)
|
|
self._image.paste(img.convert("RGB"))
|
|
self._last_update = data["timestamp"]
|
|
print("FULL REFRESH DONE")
|
|
|
|
def _reader(self):
|
|
while self._running:
|
|
try:
|
|
name, data = self.sio.receive()
|
|
except exceptions.TimeoutError:
|
|
print("Timeout")
|
|
continue
|
|
|
|
if name == "batched_bit_toggles":
|
|
bits_on, bits_off, timestamp = data
|
|
if timestamp < self._last_update:
|
|
print("SKIP partial updates: too old")
|
|
continue
|
|
self._last_update = timestamp
|
|
for ndx in bits_on:
|
|
y, x = divmod(ndx, 1000)
|
|
self._image.putpixel((x, y), COLORS[1])
|
|
for ndx in bits_off:
|
|
y, x = divmod(ndx, 1000)
|
|
self._image.putpixel((x, y), COLORS[0])
|
|
self._tk_image.paste(self._image)
|
|
print("partial update", len(bits_on), len(bits_off))
|
|
|
|
elif name == "full_state":
|
|
print("full update")
|
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
|
img = Image.frombytes("1", (1000, 1000), buffer).convert("P")
|
|
img.putpalette(COLORS_UNPACKED)
|
|
self._image.paste(img.convert("RGB"))
|
|
self._tk_image.paste(self._image)
|
|
|
|
else:
|
|
print(name, data)
|
|
|
|
self.sio.disconnect()
|
|
print("_reader exited")
|
|
|
|
def _close(self):
|
|
self._running = False
|
|
print("waiting for reader to close")
|
|
self._reader_thr.join()
|
|
self.destroy()
|
|
|
|
def start(self):
|
|
self.sio.connect(f"{self.url}/socket.io")
|
|
|
|
with get(f"{self.url}/api/initial-state") as req:
|
|
data = req.json()
|
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
|
self._last_update = data["timestamp"]
|
|
img = Image.frombytes("1", (1000, 1000), buffer).convert("P")
|
|
img.putpalette(COLORS_UNPACKED)
|
|
self._image.paste(img.convert("RGB"))
|
|
self._tk_image = ImageTk.PhotoImage(self._image)
|
|
self._tk_image_id = self._canvas.create_image(0, 0, anchor="nw", image=self._tk_image)
|
|
self._running = True
|
|
|
|
self._reader_thr = Thread(target=self._reader, name="Reader thread")
|
|
self._reader_thr.start()
|
|
|
|
self.protocol("WM_DELETE_WINDOW", self._close)
|
|
self.after(1000, self._save_image)
|
|
self.mainloop()
|
|
|
|
if __name__ == "__main__":
|
|
app = App()
|
|
app.start()
|