onemillioncheckboxes/stream-from-stdin.py

37 lines
1012 B
Python
Raw Normal View History

# x-run: cat _junk/logs.txt | python3 %
from base64 import b64decode
from sys import stdout, stdin, stderr
from json import loads
canvas = bytearray(125000)
last_update: int = 0
last_frame: int = 0
n_frames = 0
for line in stdin:
_, data = line.strip().split(" ", 1)
event, data = loads(data)
if event == "full_state":
buffer = b64decode(data["full_state"].encode() + b"=")
last_update = data["timestamp"]
canvas[:] = buffer
elif event == "batched_bit_toggles":
bits_on, bits_off, timestamp = data
if timestamp >= last_update:
for ndx in bits_on:
byte, bit = divmod(ndx, 8)
canvas[byte] |= (1 << bit)
for ndx in bits_off:
byte, bit = divmod(ndx, 8)
canvas[byte] &= 0xFF ^ (1 << bit)
last_update = timestamp
if (last_update - last_frame) > 6_000:
stdout.buffer.write(canvas)
stdout.buffer.flush()
last_frame = last_update