ambientlightdemo/rpi/main.py
2025-12-02 17:37:11 +01:00

73 lines
No EOL
1.9 KiB
Python

import socket
import time
import json
from lights import lights
from stepper import stepper
import gdmath
from readangle import read_angle_f
import threading
UDP_HOST = "steamdeck"
UDP_PORT_LOCAL = 4444
UDP_PORT_NETWORKED = 4433
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("", UDP_PORT_LOCAL))
sock.setblocking(0)
def send_json(data):
message = json.dumps(data).encode('utf-8')
sock.sendto(message, (UDP_HOST, UDP_PORT_NETWORKED))
def receive_json():
try:
data, _ = sock.recvfrom(1024)
return json.loads(data.decode('utf-8'))
except BlockingIOError:
return None
local_rotation = 0
local_delta = 0
remote_rotation = 0
remote_delta = 0
def read_thread_fn():
global local_rotation, local_delta
while True:
old_rotation = local_rotation
local_rotation = read_angle_f()
local_delta = gdmath.shortest_diff(old_rotation, local_rotation)
send_json({"value": local_rotation, "delta": local_delta})
def lights_fn():
global local_rotation, local_delta, remote_rotation, remote_delta
last_time = time.time()
delta = 1/60
while True:
lights.color = gdmath.sample_color_gradient(local_rotation + remote_rotation)
lights.expected_rotation_delta = local_delta
lights.process(delta)
current_time = time.time()
time.sleep(1/60)
delta = current_time - last_time
last_time = current_time
def network_recv_fn():
global remote_rotation, remote_delta
while True:
data = receive_json()
if data:
remote_rotation = data["value"]
remote_delta = data["delta"]
time.sleep(0.01)
read_thread = threading.Thread(target=read_thread_fn)
read_thread.start()
lights_thread = threading.Thread(target=lights_fn)
lights_thread.start()
network_thread = threading.Thread(target=network_recv_fn)
network_thread.start()
read_thread.join()
lights_thread.join()
network_thread.join()