from core.ecs import Component
import asyncio
import threading
import json
import queue
[docs]
class WebSocketComponent(Component):
"""
WebSocket component supporting both server and client modes.
Modes:
- "server": Listens on host:port, accepts client connections.
- "client": Connects to a remote WebSocket server.
Usage in scripts:
# Access the component
ws = self.entity.get_component(WebSocketComponent)
# Start the connection (call once, e.g. in on_start)
ws.start()
# In on_update, poll for incoming messages
for sender, message in ws.poll():
print(f"Received: {message}")
# Send data
ws.send("Hello") # Client: send to server. Server: broadcast to all.
ws.send_json({"key": "val"}) # Send a JSON-serializable dict.
ws.broadcast("Hi everyone") # Server only: broadcast to all clients.
ws.send_to(client_id, "Hi") # Server only: send to a specific client.
# Stop cleanly
ws.stop()
"""
MODE_SERVER = "server"
MODE_CLIENT = "client"
_VALID_MODES = {MODE_SERVER, MODE_CLIENT}
def __init__(
self,
mode: str = "client",
host: str = "localhost",
port: int = 8765,
url: str = "",
autostart: bool = False,
max_queue_size: int = 1024,
):
self.entity = None
self.mode = mode if mode in self._VALID_MODES else self.MODE_CLIENT
self.host = str(host or "localhost")
self.port = max(1, min(65535, int(port)))
self.url = str(url or "")
self.autostart = bool(autostart)
self.max_queue_size = max(1, int(max_queue_size))
# Runtime state (not serialized)
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._inbox: queue.Queue = queue.Queue(maxsize=self.max_queue_size)
self._running = False
self._started = False
self._autostart_handled = False
# Server state
self._server = None
self._clients: dict[int, object] = {} # id -> websocket
self._next_client_id = 1
# Client state
self._ws = None
# Callbacks (set from user scripts)
self.on_message_callback = None
self.on_connect_callback = None
self.on_disconnect_callback = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def start(self):
"""Start the WebSocket server or client in a background thread."""
if self._started:
return
self._started = True
self._running = True
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
[docs]
def stop(self):
"""Stop the WebSocket server or client cleanly."""
if not self._started:
return
self._running = False
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._started = False
self._loop = None
self._thread = None
self._server = None
self._clients.clear()
self._ws = None
[docs]
def is_running(self) -> bool:
"""Return True if the WebSocket is actively running."""
return self._running and self._started
[docs]
def poll(self) -> list[tuple]:
"""
Drain the inbox and return a list of (sender, message) tuples.
Call this in on_update to process incoming messages on the game thread.
For server mode, sender is the client_id (int).
For client mode, sender is "server".
Connection/disconnection events use sender "system" with message
dicts like {"event": "connected", "client_id": id} or
{"event": "disconnected", "client_id": id}.
"""
messages = []
while not self._inbox.empty():
try:
messages.append(self._inbox.get_nowait())
except queue.Empty:
break
return messages
[docs]
def send(self, message: str):
"""
Client mode: Send a message to the server.
Server mode: Broadcast a message to all connected clients.
"""
if not self._running or not self._loop:
return
if self.mode == self.MODE_CLIENT:
self._schedule(self._client_send(message))
else:
self._schedule(self._server_broadcast(message))
[docs]
def send_json(self, data):
"""Send a JSON-serializable object as a string message."""
try:
self.send(json.dumps(data))
except (TypeError, ValueError) as e:
print(f"[WebSocket] Failed to serialize JSON: {e}")
[docs]
def broadcast(self, message: str):
"""Server only: Broadcast a message to all connected clients."""
if self.mode != self.MODE_SERVER or not self._running or not self._loop:
return
self._schedule(self._server_broadcast(message))
[docs]
def send_to(self, client_id: int, message: str):
"""Server only: Send a message to a specific client by ID."""
if self.mode != self.MODE_SERVER or not self._running or not self._loop:
return
self._schedule(self._server_send_to(client_id, message))
[docs]
def get_client_count(self) -> int:
"""Server only: Return the number of connected clients."""
return len(self._clients)
[docs]
def get_client_ids(self) -> list[int]:
"""Server only: Return a list of connected client IDs."""
return list(self._clients.keys())
[docs]
def get_url(self) -> str:
"""Return the effective WebSocket URL."""
if self.mode == self.MODE_CLIENT:
return self.url or f"ws://{self.host}:{self.port}"
return f"ws://{self.host}:{self.port}"
# ------------------------------------------------------------------
# Internal: event loop
# ------------------------------------------------------------------
def _run_loop(self):
"""Entry point for the background thread."""
asyncio.set_event_loop(self._loop)
try:
if self.mode == self.MODE_SERVER:
self._loop.run_until_complete(self._run_server())
else:
self._loop.run_until_complete(self._run_client())
except Exception as e:
if self._running:
print(f"[WebSocket] Loop error: {e}")
finally:
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except Exception:
pass
self._loop.close()
def _schedule(self, coro):
"""Schedule a coroutine on the event loop from the game thread."""
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(coro, self._loop)
def _enqueue(self, sender, message):
"""Put a message into the inbox queue (thread-safe)."""
try:
self._inbox.put_nowait((sender, message))
except queue.Full:
pass # drop oldest not implemented; just drop new if full
# ------------------------------------------------------------------
# Server mode
# ------------------------------------------------------------------
async def _run_server(self):
try:
import websockets
except ImportError:
print("[WebSocket] 'websockets' package not installed. Run: pip install websockets")
return
async def handler(websocket):
client_id = self._next_client_id
self._next_client_id += 1
self._clients[client_id] = websocket
self._enqueue("system", {"event": "connected", "client_id": client_id})
try:
async for message in websocket:
self._enqueue(client_id, message)
except websockets.exceptions.ConnectionClosed:
pass
finally:
self._clients.pop(client_id, None)
self._enqueue("system", {"event": "disconnected", "client_id": client_id})
try:
self._server = await websockets.serve(handler, self.host, self.port)
print(f"[WebSocket] Server started on ws://{self.host}:{self.port}")
# Keep running until stopped
while self._running:
await asyncio.sleep(0.1)
except Exception as e:
print(f"[WebSocket] Server error: {e}")
finally:
if self._server:
self._server.close()
await self._server.wait_closed()
print(f"[WebSocket] Server stopped")
async def _server_broadcast(self, message: str):
try:
import websockets
except ImportError:
return
for client_id, ws in list(self._clients.items()):
try:
await ws.send(message)
except Exception:
self._clients.pop(client_id, None)
async def _server_send_to(self, client_id: int, message: str):
ws = self._clients.get(client_id)
if ws is None:
return
try:
await ws.send(message)
except Exception:
self._clients.pop(client_id, None)
# ------------------------------------------------------------------
# Client mode
# ------------------------------------------------------------------
async def _run_client(self):
try:
import websockets
except ImportError:
print("[WebSocket] 'websockets' package not installed. Run: pip install websockets")
return
target_url = self.url or f"ws://{self.host}:{self.port}"
retry_delay = 1.0
max_retry_delay = 30.0
while self._running:
try:
async with websockets.connect(target_url) as ws:
self._ws = ws
retry_delay = 1.0
self._enqueue("system", {"event": "connected", "url": target_url})
print(f"[WebSocket] Client connected to {target_url}")
async for message in ws:
self._enqueue("server", message)
except Exception as e:
self._ws = None
if self._running:
self._enqueue("system", {"event": "disconnected", "url": target_url, "reason": str(e)})
print(f"[WebSocket] Client disconnected: {e}. Retrying in {retry_delay:.1f}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_retry_delay)
else:
break
self._ws = None
async def _client_send(self, message: str):
if self._ws:
try:
await self._ws.send(message)
except Exception as e:
print(f"[WebSocket] Send error: {e}")
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def on_destroy(self):
"""Called when the entity is destroyed."""
self.stop()