Source code for core.components.multiplayer

from core.ecs import Component
from core.multiplayer.protocol import MessageType, encode_message, decode_message
from core.multiplayer.room import Room, Player
import uuid
import time
import queue
from core.logger import get_logger

_mp_logger = get_logger("multiplayer")


[docs] class MultiplayerComponent(Component): """ High-level multiplayer manager component. Attach to a single entity (e.g. a GameManager) to manage multiplayer sessions. Uses a WebSocketComponent on the same entity for transport. Provides lobby management, player tracking, RPCs, and state sync. Modes: - "host": Creates a WebSocket server, manages the room as authority. - "client": Connects to a host's WebSocket server. Usage in scripts: mp = self.entity.get_component(MultiplayerComponent) # Host a game mp.host_game("MyRoom") # Or join a game mp.join_game("ws://192.168.1.10:8765", "PlayerName") # In on_update, pump the network mp.poll() # Check players for player in mp.get_players(): print(player.name, player.is_ready) # Set ready mp.set_ready(True) # Start game (host only, when all ready) mp.start_game() # Send RPC to all players mp.rpc("take_damage", {"amount": 10}) # Send RPC to specific player mp.rpc_to(player_id, "heal", {"amount": 5}) # Send RPC to host only mp.rpc_to_host("request_spawn", {"prefab": "bullet"}) # Send custom data mp.send_custom("chat", {"text": "Hello!"}) # Listen for events (via global event system) self.subscribe_to_event("mp_player_joined", self.on_player_joined) self.subscribe_to_event("mp_player_left", self.on_player_left) self.subscribe_to_event("mp_game_started", self.on_game_started) self.subscribe_to_event("mp_rpc", self.on_rpc) self.subscribe_to_event("mp_custom", self.on_custom) self.subscribe_to_event("mp_state_sync", self.on_state_sync) self.subscribe_to_event("mp_disconnected", self.on_disconnected) """ MODE_HOST = "host" MODE_CLIENT = "client" def __init__( self, player_name: str = "Player", max_players: int = 8, sync_rate: float = 20.0, port: int = 8765, ): self.entity = None self.player_name = str(player_name or "Player") self.max_players = max(2, int(max_players)) self.sync_rate = max(1.0, float(sync_rate)) self.port = max(1, min(65535, int(port))) # Runtime state (not serialized) self._mode: str = "" self._room: Room | None = None self._local_player_id: str = "" self._connected = False self._active = False self._sync_timer = 0.0 self._sync_interval = 1.0 / self.sync_rate self._rpc_handlers: dict[str, callable] = {} self._pending_events: list[tuple] = [] # ------------------------------------------------------------------ # Public API — Connection # ------------------------------------------------------------------
[docs] def host_game(self, room_name: str = "Room"): """Start hosting a multiplayer game. Sets up WebSocket server.""" if self._active: return from core.components.websocket import WebSocketComponent ws = self._ensure_websocket() ws.mode = "server" ws.host = "0.0.0.0" ws.port = self.port ws.start() self._mode = self.MODE_HOST self._local_player_id = str(uuid.uuid4()) self._room = Room(room_name=room_name, max_players=self.max_players) local_player = Player( player_id=self._local_player_id, name=self.player_name, client_id=0, is_host=True, is_local=True, ) self._room.add_player(local_player) self._active = True self._connected = True self._emit("mp_connected", {"mode": "host"})
[docs] def join_game(self, url: str, player_name: str = None): """Join a hosted game as a client.""" if self._active: return if player_name: self.player_name = player_name from core.components.websocket import WebSocketComponent ws = self._ensure_websocket() ws.mode = "client" ws.url = url ws.start() self._mode = self.MODE_CLIENT self._local_player_id = str(uuid.uuid4()) self._active = True
[docs] def disconnect(self): """Disconnect from the multiplayer session.""" if not self._active: return from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws and self._connected: msg = encode_message(MessageType.DISCONNECT, { "player_id": self._local_player_id }, self._local_player_id) ws.send(msg) if ws: ws.stop() self._active = False self._connected = False self._room = None self._mode = "" self._emit("mp_disconnected", {})
# ------------------------------------------------------------------ # Public API — Lobby # ------------------------------------------------------------------
[docs] def set_ready(self, ready: bool = True): """Set local player's ready state.""" if not self._active or not self._room: return local = self._room.get_player(self._local_player_id) if local: local.is_ready = ready msg = encode_message(MessageType.PLAYER_READY, { "player_id": self._local_player_id, "ready": ready, }, self._local_player_id) self._send(msg)
[docs] def start_game(self): """Host only: Start the game if all players are ready.""" if self._mode != self.MODE_HOST or not self._room: return if not self._room.all_ready(): return self._room.started = True msg = encode_message(MessageType.GAME_START, {}, self._local_player_id) self._broadcast(msg) self._emit("mp_game_started", {})
[docs] def kick_player(self, player_id: str): """Host only: Kick a player from the room.""" if self._mode != self.MODE_HOST or not self._room: return player = self._room.remove_player(player_id) if not player: return msg = encode_message(MessageType.PLAYER_LEFT, { "player_id": player_id, "reason": "kicked", }, self._local_player_id) self._broadcast(msg) # Disconnect the client from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws and player.client_id: ws.send_to(player.client_id, encode_message(MessageType.DISCONNECT, { "reason": "kicked" }))
# ------------------------------------------------------------------ # Public API — Players # ------------------------------------------------------------------
[docs] def get_players(self) -> list[Player]: """Return list of all players in the room.""" if not self._room: return [] return list(self._room.players.values())
[docs] def get_player(self, player_id: str) -> Player | None: """Get a specific player by ID.""" if not self._room: return None return self._room.get_player(player_id)
[docs] def get_local_player(self) -> Player | None: """Get the local player.""" if not self._room: return None return self._room.get_player(self._local_player_id)
[docs] def get_player_count(self) -> int: return self._room.get_player_count() if self._room else 0
@property def local_player_id(self) -> str: return self._local_player_id @property def is_host(self) -> bool: return self._mode == self.MODE_HOST @property def is_connected(self) -> bool: return self._connected @property def is_active(self) -> bool: return self._active @property def room(self) -> Room | None: return self._room # ------------------------------------------------------------------ # Public API — RPC # ------------------------------------------------------------------
[docs] def register_rpc(self, method_name: str, handler: callable): """Register an RPC handler function.""" self._rpc_handlers[method_name] = handler
[docs] def rpc(self, method: str, data: dict = None): """Send an RPC to all players (including self).""" msg = encode_message(MessageType.RPC_CALL, { "method": method, "args": data or {}, "target": "all", }, self._local_player_id) self._broadcast(msg) # Also invoke locally self._handle_rpc(method, data or {}, self._local_player_id)
[docs] def rpc_to(self, player_id: str, method: str, data: dict = None): """Send an RPC to a specific player.""" payload = { "method": method, "args": data or {}, "target": player_id, } if player_id == self._local_player_id: self._handle_rpc(method, data or {}, self._local_player_id) return msg = encode_message(MessageType.RPC_CALL, payload, self._local_player_id) if self._mode == self.MODE_HOST and self._room: player = self._room.get_player(player_id) if player and player.client_id: from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws: ws.send_to(player.client_id, msg) else: self._send(msg)
[docs] def rpc_to_host(self, method: str, data: dict = None): """Send an RPC to the host.""" if self._mode == self.MODE_HOST: self._handle_rpc(method, data or {}, self._local_player_id) return msg = encode_message(MessageType.RPC_CALL, { "method": method, "args": data or {}, "target": "host", }, self._local_player_id) self._send(msg)
# ------------------------------------------------------------------ # Public API — Custom Messages & State # ------------------------------------------------------------------
[docs] def send_custom(self, channel: str, data: dict = None): """Send a custom message to all players.""" msg = encode_message(MessageType.CUSTOM, { "channel": channel, "payload": data or {}, }, self._local_player_id) self._broadcast(msg) self._emit("mp_custom", {"channel": channel, "payload": data or {}, "sender": self._local_player_id})
[docs] def send_state(self, entity_net_id: str, state_data: dict): """Send entity state data (used by NetworkIdentityComponent).""" msg = encode_message(MessageType.STATE_UPDATE, { "net_id": entity_net_id, "state": state_data, }, self._local_player_id) self._broadcast(msg)
[docs] def request_spawn(self, prefab_path: str, owner_id: str = "", data: dict = None): """Host only: Request spawning a networked entity.""" if self._mode != self.MODE_HOST: self.rpc_to_host("_mp_spawn_request", { "prefab": prefab_path, "owner": owner_id or self._local_player_id, "data": data or {}, }) return net_id = str(uuid.uuid4())[:8] msg = encode_message(MessageType.SPAWN_ENTITY, { "net_id": net_id, "prefab": prefab_path, "owner": owner_id or self._local_player_id, "data": data or {}, }, self._local_player_id) self._broadcast(msg) self._emit("mp_spawn", { "net_id": net_id, "prefab": prefab_path, "owner": owner_id or self._local_player_id, "data": data or {}, })
[docs] def request_despawn(self, entity_net_id: str): """Host only: Despawn a networked entity.""" msg = encode_message(MessageType.DESPAWN_ENTITY, { "net_id": entity_net_id, }, self._local_player_id) self._broadcast(msg) self._emit("mp_despawn", {"net_id": entity_net_id})
# ------------------------------------------------------------------ # Poll — call every frame # ------------------------------------------------------------------
[docs] def poll(self): """ Process incoming WebSocket messages. Call this in on_update. Emits global events for game scripts to handle. """ if not self._active or not self.entity: return from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if not ws: return for sender, raw in ws.poll(): if sender == "system": self._handle_system_event(raw) continue msg = decode_message(raw) if isinstance(raw, str) else None if not msg: continue self._handle_message(msg, sender)
# ------------------------------------------------------------------ # Internal — message handling # ------------------------------------------------------------------ def _handle_system_event(self, event_data): """Handle WebSocket system events (connect/disconnect).""" if not isinstance(event_data, dict): return event = event_data.get("event", "") if event == "connected": if self._mode == self.MODE_CLIENT: self._connected = True msg = encode_message(MessageType.JOIN_REQUEST, { "player_id": self._local_player_id, "player_name": self.player_name, }) self._send(msg) self._emit("mp_connected", {"mode": "client"}) elif event == "disconnected": client_id = event_data.get("client_id", 0) if self._mode == self.MODE_HOST and client_id and self._room: player = self._room.get_player_by_client(client_id) if player: self._room.remove_player(player.id) leave_msg = encode_message(MessageType.PLAYER_LEFT, { "player_id": player.id, "reason": "disconnected", }, self._local_player_id) self._broadcast(leave_msg) self._emit("mp_player_left", {"player": player.to_dict(), "reason": "disconnected"}) elif self._mode == self.MODE_CLIENT: self._connected = False self._emit("mp_disconnected", {"reason": event_data.get("reason", "")}) def _handle_message(self, msg: dict, sender): """Route a decoded multiplayer message.""" msg_type = msg["type"] data = msg["data"] sender_id = msg.get("sender", "") if msg_type == MessageType.JOIN_REQUEST: self._on_join_request(data, sender) elif msg_type == MessageType.JOIN_ACCEPTED: self._on_join_accepted(data) elif msg_type == MessageType.JOIN_REJECTED: self._emit("mp_join_rejected", data) elif msg_type == MessageType.PLAYER_JOINED: self._on_player_joined(data) elif msg_type == MessageType.PLAYER_LEFT: self._on_player_left(data) elif msg_type == MessageType.LOBBY_STATE: self._on_lobby_state(data) elif msg_type == MessageType.PLAYER_READY: self._on_player_ready(data) elif msg_type == MessageType.GAME_START: if self._room: self._room.started = True self._emit("mp_game_started", data) elif msg_type == MessageType.STATE_UPDATE: self._emit("mp_state_sync", data) elif msg_type == MessageType.SPAWN_ENTITY: self._emit("mp_spawn", data) elif msg_type == MessageType.DESPAWN_ENTITY: self._emit("mp_despawn", data) elif msg_type == MessageType.RPC_CALL: self._on_rpc_call(data, sender_id, sender) elif msg_type == MessageType.CUSTOM: self._emit("mp_custom", { "channel": data.get("channel", ""), "payload": data.get("payload", {}), "sender": sender_id, }) elif msg_type == MessageType.PING: pong = encode_message(MessageType.PONG, {"ts": data.get("ts", 0)}, self._local_player_id) self._send(pong) elif msg_type == MessageType.PONG: if self._room: player = self._room.get_player(sender_id) if player: player.latency_ms = (time.time() - data.get("ts", 0)) * 1000 elif msg_type == MessageType.DISCONNECT: pass # handled by system events # ------------------------------------------------------------------ # Internal — specific handlers # ------------------------------------------------------------------ def _on_join_request(self, data: dict, client_sender): """Host: handle a client's join request.""" if self._mode != self.MODE_HOST or not self._room: return player_id = data.get("player_id", "") player_name = data.get("player_name", "Player") client_id = client_sender if isinstance(client_sender, int) else 0 if self._room.is_full(): reject = encode_message(MessageType.JOIN_REJECTED, {"reason": "Room is full"}) from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws and client_id: ws.send_to(client_id, reject) return new_player = Player( player_id=player_id, name=player_name, client_id=client_id, is_host=False, ) self._room.add_player(new_player) # Send accept + full room state to the new client accept = encode_message(MessageType.JOIN_ACCEPTED, { "player_id": player_id, "room": self._room.to_dict(), }) from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws and client_id: ws.send_to(client_id, accept) # Broadcast to all other clients joined_msg = encode_message(MessageType.PLAYER_JOINED, { "player": new_player.to_dict(), }, self._local_player_id) self._broadcast(joined_msg, exclude_client=client_id) self._emit("mp_player_joined", {"player": new_player.to_dict()}) def _on_join_accepted(self, data: dict): """Client: handle join acceptance from host.""" self._connected = True room_data = data.get("room", {}) self._room = Room.from_dict(room_data) # Mark local player local = self._room.get_player(self._local_player_id) if local: local.is_local = True self._emit("mp_joined", {"room": room_data}) def _on_player_joined(self, data: dict): """Handle a new player joining.""" pdata = data.get("player", {}) if not self._room: return player = Player.from_dict(pdata) self._room.add_player(player) self._emit("mp_player_joined", {"player": pdata}) def _on_player_left(self, data: dict): """Handle a player leaving.""" player_id = data.get("player_id", "") reason = data.get("reason", "") if self._room: player = self._room.remove_player(player_id) if player: self._emit("mp_player_left", {"player": player.to_dict(), "reason": reason}) def _on_lobby_state(self, data: dict): """Client: receive full lobby state update.""" room_data = data.get("room", {}) if room_data: self._room = Room.from_dict(room_data) local = self._room.get_player(self._local_player_id) if local: local.is_local = True self._emit("mp_lobby_state", {"room": room_data}) def _on_player_ready(self, data: dict): """Handle player ready state change.""" player_id = data.get("player_id", "") ready = data.get("ready", False) if self._room: player = self._room.get_player(player_id) if player: player.is_ready = ready # Host relays to all if self._mode == self.MODE_HOST: relay = encode_message(MessageType.PLAYER_READY, data, self._local_player_id) self._broadcast(relay) self._emit("mp_player_ready", {"player_id": player_id, "ready": ready}) def _on_rpc_call(self, data: dict, sender_id: str, raw_sender): """Handle an incoming RPC call.""" method = data.get("method", "") args = data.get("args", {}) target = data.get("target", "all") # Host relays RPCs to appropriate targets if self._mode == self.MODE_HOST and target == "all": relay = encode_message(MessageType.RPC_CALL, data, sender_id) exclude = raw_sender if isinstance(raw_sender, int) else 0 self._broadcast(relay, exclude_client=exclude) # Check if this RPC is for us if target == "all" or target == self._local_player_id or \ (target == "host" and self._mode == self.MODE_HOST): self._handle_rpc(method, args, sender_id) def _handle_rpc(self, method: str, args: dict, sender_id: str): """Invoke a registered RPC handler or emit an event.""" handler = self._rpc_handlers.get(method) if handler: try: handler(sender_id, args) except Exception as e: _mp_logger.error("RPC handler error", method=method, error=str(e)) self._emit("mp_rpc", {"method": method, "args": args, "sender": sender_id}) # ------------------------------------------------------------------ # Internal — transport helpers # ------------------------------------------------------------------ def _ensure_websocket(self): """Get or create a WebSocketComponent on this entity.""" from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if not ws and self.entity: ws = WebSocketComponent() self.entity.add_component(ws) return ws def _send(self, msg: str): """Send a message (client sends to server, host broadcasts).""" from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if ws: ws.send(msg) def _broadcast(self, msg: str, exclude_client: int = 0): """Host: broadcast to all connected clients (optionally excluding one).""" from core.components.websocket import WebSocketComponent ws = self.entity.get_component(WebSocketComponent) if self.entity else None if not ws: return if self._mode == self.MODE_HOST: for cid in ws.get_client_ids(): if cid != exclude_client: ws.send_to(cid, msg) else: ws.send(msg) def _emit(self, event_name: str, data: dict): """Emit a global event through the entity's world event system.""" if self.entity and self.entity.world: self.entity.world.events.emit(event_name, data) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def on_destroy(self): self.disconnect()