Source code for core.components.webrtc

from core.ecs import Component
import asyncio
import threading
import json
import queue


[docs] class WebRTCComponent(Component): """ WebRTC component for peer-to-peer data channel communication. Supports creating and joining peer connections with data channels for low-latency game networking. Uses aiortc under the hood. A signaling mechanism (e.g. WebSocket) is needed to exchange offers/answers/ICE candidates between peers. Usage in scripts: rtc = self.entity.get_component(WebRTCComponent) # Create an offer (caller side) rtc.create_offer() # Poll for signaling data to send to the remote peer for sender, msg in rtc.poll(): if sender == "local": # msg is a dict like {"type": "offer", "sdp": "..."} or # {"type": "candidate", ...} # Send this to the remote peer via your signaling channel ws.send_json(msg) # On the remote side, set the remote description from received signaling rtc.set_remote_description(offer_dict) # triggers answer creation # Feed ICE candidates from the remote peer rtc.add_ice_candidate(candidate_dict) # Send data over the data channel rtc.send("Hello peer!") rtc.send_json({"action": "move", "x": 10}) # Poll for incoming data channel messages for sender, msg in rtc.poll(): if sender == "datachannel": print("Received:", msg) # Close rtc.close() """ def __init__( self, ice_servers: str = "stun:stun.l.google.com:19302", data_channel_label: str = "game", ordered: bool = True, max_retransmits: int = -1, autostart: bool = False, max_queue_size: int = 1024, ): self.entity = None self.ice_servers = str(ice_servers or "stun:stun.l.google.com:19302") self.data_channel_label = str(data_channel_label or "game") self.ordered = bool(ordered) self.max_retransmits = int(max_retransmits) self.autostart = bool(autostart) self.max_queue_size = max(1, int(max_queue_size)) # Runtime state (not serialized) self._loop: asyncio.AbstractEventLoop | None = None self._thread: threading.Thread | None = None self._inbox: queue.Queue = queue.Queue(maxsize=self.max_queue_size) self._running = False self._started = False self._autostart_handled = False self._pc = None # RTCPeerConnection self._dc = None # RTCDataChannel self._connected = False # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def start(self): """Initialize the peer connection and background event loop.""" if self._started: return self._started = True self._running = True self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start()
[docs] def close(self): """Close the peer connection and stop the event loop.""" if not self._started: return self._running = False if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._thread and self._thread.is_alive(): self._thread.join(timeout=2.0) self._started = False self._loop = None self._thread = None self._pc = None self._dc = None self._connected = False
[docs] def is_connected(self) -> bool: """Return True if the data channel is open.""" return self._connected
[docs] def is_running(self) -> bool: """Return True if the WebRTC component is actively running.""" return self._running and self._started
[docs] def poll(self) -> list[tuple]: """ Drain the inbox and return a list of (sender, message) tuples. Call this in on_update. Sender types: - "local": signaling data to forward to the remote peer (offer, answer, or ICE candidate dicts) - "datachannel": data received from the remote peer - "system": connection state events like {"event": "connected"}, {"event": "disconnected"}, {"event": "error", "message": "..."} """ messages = [] while not self._inbox.empty(): try: messages.append(self._inbox.get_nowait()) except queue.Empty: break return messages
[docs] def create_offer(self): """Create an SDP offer (caller side). Results appear in poll() as local signaling.""" if not self._running or not self._loop: return self._schedule(self._create_offer())
[docs] def create_answer(self): """Create an SDP answer (callee side). Results appear in poll() as local signaling.""" if not self._running or not self._loop: return self._schedule(self._create_answer())
[docs] def set_remote_description(self, sdp_dict: dict): """ Set the remote SDP description received from the signaling channel. sdp_dict should have keys "type" ("offer" or "answer") and "sdp". If type is "offer", an answer is automatically created. """ if not self._running or not self._loop: return self._schedule(self._set_remote_description(sdp_dict))
[docs] def add_ice_candidate(self, candidate_dict: dict): """ Add an ICE candidate received from the signaling channel. candidate_dict should have keys like "candidate", "sdpMid", "sdpMLineIndex". """ if not self._running or not self._loop: return self._schedule(self._add_ice_candidate(candidate_dict))
[docs] def send(self, message: str): """Send a string message over the data channel.""" if not self._connected or not self._dc: return try: self._dc.send(message) except Exception as e: print(f"[WebRTC] Send error: {e}")
[docs] def send_json(self, data): """Send a JSON-serializable object as a string message.""" try: self.send(json.dumps(data)) except (TypeError, ValueError) as e: print(f"[WebRTC] Failed to serialize JSON: {e}")
[docs] def send_bytes(self, data: bytes): """Send raw bytes over the data channel.""" if not self._connected or not self._dc: return try: self._dc.send(data) except Exception as e: print(f"[WebRTC] Send bytes error: {e}")
# ------------------------------------------------------------------ # Internal: event loop # ------------------------------------------------------------------ def _run_loop(self): """Entry point for the background thread.""" asyncio.set_event_loop(self._loop) try: self._loop.run_until_complete(self._init_peer_connection()) self._loop.run_forever() except Exception as e: if self._running: print(f"[WebRTC] Loop error: {e}") self._enqueue("system", {"event": "error", "message": str(e)}) finally: if self._pc: try: self._loop.run_until_complete(self._pc.close()) except Exception: pass try: self._loop.run_until_complete(self._loop.shutdown_asyncgens()) except Exception: pass self._loop.close() def _schedule(self, coro): """Schedule a coroutine on the event loop from the game thread.""" if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(coro, self._loop) def _enqueue(self, sender, message): """Put a message into the inbox queue (thread-safe).""" try: self._inbox.put_nowait((sender, message)) except queue.Full: pass # ------------------------------------------------------------------ # Internal: peer connection setup # ------------------------------------------------------------------ async def _init_peer_connection(self): try: from aiortc import RTCPeerConnection, RTCConfiguration, RTCIceServer except ImportError: print("[WebRTC] 'aiortc' package not installed. Run: pip install aiortc") self._enqueue("system", {"event": "error", "message": "aiortc not installed"}) self._running = False return ice_list = [] for server_url in self.ice_servers.split(","): url = server_url.strip() if url: ice_list.append(RTCIceServer(urls=[url])) config = RTCConfiguration(iceServers=ice_list) if ice_list else RTCConfiguration() self._pc = RTCPeerConnection(configuration=config) @self._pc.on("icecandidate") def on_ice_candidate(candidate): if candidate: self._enqueue("local", { "type": "candidate", "candidate": candidate.candidate, "sdpMid": candidate.sdpMid, "sdpMLineIndex": candidate.sdpMLineIndex, }) @self._pc.on("connectionstatechange") async def on_connection_state_change(): state = self._pc.connectionState if state == "connected": self._connected = True self._enqueue("system", {"event": "connected"}) print("[WebRTC] Peer connected") elif state in ("disconnected", "failed", "closed"): self._connected = False self._enqueue("system", {"event": "disconnected", "state": state}) print(f"[WebRTC] Connection state: {state}") @self._pc.on("datachannel") def on_datachannel(channel): self._dc = channel self._setup_data_channel_events(channel) print(f"[WebRTC] Remote data channel received: {channel.label}") def _setup_data_channel_events(self, channel): @channel.on("open") def on_open(): self._connected = True self._enqueue("system", {"event": "datachannel_open", "label": channel.label}) print(f"[WebRTC] Data channel '{channel.label}' open") @channel.on("close") def on_close(): self._connected = False self._enqueue("system", {"event": "datachannel_close", "label": channel.label}) @channel.on("message") def on_message(message): self._enqueue("datachannel", message) # ------------------------------------------------------------------ # Internal: signaling # ------------------------------------------------------------------ async def _create_offer(self): try: from aiortc import RTCSessionDescription except ImportError: return # Create data channel (caller creates it) dc_options = {} if not self.ordered: dc_options["ordered"] = False if self.max_retransmits >= 0: dc_options["maxRetransmits"] = self.max_retransmits self._dc = self._pc.createDataChannel(self.data_channel_label, **dc_options) self._setup_data_channel_events(self._dc) offer = await self._pc.createOffer() await self._pc.setLocalDescription(offer) self._enqueue("local", { "type": "offer", "sdp": self._pc.localDescription.sdp, }) print("[WebRTC] Offer created") async def _create_answer(self): answer = await self._pc.createAnswer() await self._pc.setLocalDescription(answer) self._enqueue("local", { "type": "answer", "sdp": self._pc.localDescription.sdp, }) print("[WebRTC] Answer created") async def _set_remote_description(self, sdp_dict: dict): try: from aiortc import RTCSessionDescription except ImportError: return sdp_type = sdp_dict.get("type", "") sdp = sdp_dict.get("sdp", "") if not sdp_type or not sdp: return rd = RTCSessionDescription(sdp=sdp, type=sdp_type) await self._pc.setRemoteDescription(rd) print(f"[WebRTC] Remote description set ({sdp_type})") if sdp_type == "offer": await self._create_answer() async def _add_ice_candidate(self, candidate_dict: dict): try: from aiortc import RTCIceCandidate except ImportError: return candidate_str = candidate_dict.get("candidate", "") if not candidate_str: return sdp_mid = candidate_dict.get("sdpMid", "") sdp_mline_index = candidate_dict.get("sdpMLineIndex", 0) try: candidate = RTCIceCandidate( candidate=candidate_str, sdpMid=sdp_mid, sdpMLineIndex=sdp_mline_index, ) await self._pc.addIceCandidate(candidate) except Exception as e: print(f"[WebRTC] Failed to add ICE candidate: {e}") # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def on_destroy(self): """Called when the entity is destroyed.""" self.close()