Source code for core.headless_server

"""
Headless server runner for AxisPy Engine.

Runs the game loop without any display, rendering, or audio.
Only processes physics, scripts, and networking systems.
Designed for dedicated game servers on Linux and Windows.

Usage:
    python -m core.headless_server <scene_path>
    python core/headless_server.py <scene_path>
"""
import os
import sys
import json
import time
import signal
import argparse

# Set environment to suppress pygame display before importing it
os.environ["SDL_VIDEODRIVER"] = "dummy"
os.environ["SDL_AUDIODRIVER"] = "dummy"

import pygame


[docs] def run_headless(scene_path: str, tick_rate: float = 60.0, verbose: bool = False): """ Run the engine in headless mode (no display, no rendering, no audio). Args: scene_path: Path to the .scn scene file to load. tick_rate: Server tick rate in Hz (default 60). verbose: If True, print periodic status info. """ project_dir = "" project_config = {} if scene_path: scene_abs_path = os.path.abspath(scene_path) env_project_dir = os.environ.get("AXISPY_PROJECT_PATH", "").strip() if env_project_dir and os.path.exists(env_project_dir): project_dir = os.path.abspath(env_project_dir) else: scene_parent = os.path.dirname(scene_abs_path) if os.path.basename(scene_parent).lower() == "scenes": project_dir = os.path.dirname(scene_parent) else: project_dir = scene_parent if project_dir not in sys.path: sys.path.insert(0, project_dir) os.chdir(project_dir) from core.resources import ResourceManager ResourceManager.set_headless(True) ResourceManager.set_base_path(project_dir) # Read project config if scene_path: config_path = os.path.join(project_dir, "project.config") if os.path.exists(config_path): try: with open(config_path, "r") as f: project_config = json.load(f) except Exception as e: print(f"[Server] Failed to read project.config: {e}") # Initialize pygame in minimal mode (no display, no audio) pygame.init() # Create a tiny hidden surface so pygame doesn't complain screen = pygame.display.set_mode((1, 1), pygame.NOFRAME) pygame.display.set_caption("AxisPy Headless Server") from core.scene import Scene from core.systems.physics_system import PhysicsSystem from core.systems.script_system import ScriptSystem from core.systems.network_system import NetworkSystem from core.systems.timer_system import TimerSystem from core.systems.event_dispatch_system import EventDispatchSystem from core.serializer import SceneSerializer from core.input import Input # Load scene def load_scene(target_path: str) -> Scene: if target_path and os.path.exists(target_path): try: with open(target_path, "r") as f: loaded = SceneSerializer.from_json(f.read()) return loaded except Exception as e: print(f"[Server] Failed to load scene: {e}") fallback = Scene() fallback.setup_default() return fallback def apply_world_settings(target_scene: Scene): """Apply project config to world (layers, groups, collision matrix).""" config_layers = project_config.get("layers", ["Default"]) normalized_layers = [] seen = set() if isinstance(config_layers, list): for layer in config_layers: name = str(layer).strip() if not name: continue low = name.lower() if low in seen: continue seen.add(low) normalized_layers.append(name) if "default" in seen: normalized_layers = [l for l in normalized_layers if l.lower() != "default"] normalized_layers.insert(0, "Default") target_scene.world.layers = normalized_layers config_groups = project_config.get("groups", []) normalized_groups = [] seen_groups = set() if isinstance(config_groups, list): for g in config_groups: text = str(g).strip() if not text: continue low = text.lower() if low in seen_groups: continue seen_groups.add(low) normalized_groups.append(text) world = target_scene.world for gn in list(world.groups.keys()): if gn not in normalized_groups: members = list(world.groups.get(gn, set())) for entity in members: entity.remove_group(gn) for gn in normalized_groups: world.groups.setdefault(gn, set()) raw_matrix = project_config.get("physics_collision_matrix", {}) if not isinstance(raw_matrix, dict): raw_matrix = {} normalized_matrix = {} for row in normalized_groups: targets = raw_matrix.get(row, normalized_groups) if not isinstance(targets, list): targets = normalized_groups allowed = [] seen_t = set() for t in targets: tn = str(t).strip() if tn not in normalized_groups: continue lt = tn.lower() if lt in seen_t: continue seen_t.add(lt) allowed.append(tn) normalized_matrix[row] = allowed for row in normalized_groups: for target in list(normalized_matrix.get(row, [])): peer = normalized_matrix.setdefault(target, []) if row not in peer: peer.append(row) world.physics_group_order = list(normalized_groups) world.physics_collision_matrix = normalized_matrix current_scene_path = os.path.abspath(scene_path) if scene_path and os.path.exists(scene_path) else "" scene = load_scene(current_scene_path) apply_world_settings(scene) # Add headless systems (no render, no audio, no animation, no particles, no UI) physics_system = PhysicsSystem() scene.world.add_system(physics_system) script_system = ScriptSystem() scene.world.add_system(script_system) network_system = NetworkSystem() scene.world.add_system(network_system) timer_system = TimerSystem() scene.world.add_system(timer_system) event_dispatch_system = EventDispatchSystem() scene.world.add_system(event_dispatch_system) def attach_systems(target_scene: Scene): target_scene.world.add_system(physics_system) target_scene.world.add_system(script_system) target_scene.world.add_system(network_system) target_scene.world.add_system(timer_system) target_scene.world.add_system(event_dispatch_system) def resolve_scene_change(scene_name: str, current_path: str) -> str: requested = str(scene_name or "").strip() if not requested: return "" requested = os.path.normpath(requested) has_ext = bool(os.path.splitext(requested)[1]) variants = [requested] if has_ext else [requested, requested + ".scn"] candidates = [] for variant in variants: if os.path.isabs(variant): candidates.append(variant) continue if project_dir: candidates.append(os.path.normpath(os.path.join(project_dir, variant))) candidates.append(os.path.normpath(os.path.join(project_dir, "scenes", variant))) if current_path: candidates.append(os.path.normpath(os.path.join(os.path.dirname(current_path), variant))) for c in candidates: if c and os.path.exists(c): return os.path.abspath(c) return "" def teardown_scene(target_scene: Scene): if not target_scene: return for entity in list(target_scene.world.entities): target_scene.world.destroy_entity(entity) scene.world.sync_interpolation_state() # Server loop fixed_dt = 1.0 / tick_rate max_substeps = 8 running = True tick_count = 0 start_time = time.time() def signal_handler(sig, frame): nonlocal running print(f"\n[Server] Received signal {sig}, shutting down...") running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print(f"[Server] Headless server started") print(f"[Server] Scene: {current_scene_path}") print(f"[Server] Tick rate: {tick_rate} Hz (dt={fixed_dt:.4f}s)") print(f"[Server] Press Ctrl+C to stop") last_time = time.perf_counter() accumulator = 0.0 status_interval = 10.0 last_status_time = time.time() while running: now = time.perf_counter() frame_dt = now - last_time last_time = now # Cap frame delta to avoid spiral of death if frame_dt > 0.25: frame_dt = 0.25 accumulator += frame_dt # Process pygame events (minimal — just to keep pygame happy) for event in pygame.event.get(): if event.type == pygame.QUIT: running = False # Handle scene changes requested_scene = getattr(scene.world, "_requested_scene_name", "") if requested_scene: scene.world._requested_scene_name = "" resolved = resolve_scene_change(requested_scene, current_scene_path) if resolved: teardown_scene(scene) scene = load_scene(resolved) current_scene_path = resolved apply_world_settings(scene) attach_systems(scene) physics_system._active_collisions.clear() scene.world.sync_interpolation_state() accumulator = 0.0 print(f"[Server] Scene changed to: {resolved}") else: print(f"[Server] Scene change failed: {requested_scene}") # Fixed timestep simulation step_count = 0 while accumulator >= fixed_dt and step_count < max_substeps: scene.world.simulate(fixed_dt) accumulator -= fixed_dt step_count += 1 tick_count += 1 if step_count == max_substeps and accumulator >= fixed_dt: accumulator = min(accumulator, fixed_dt) # Periodic status output if verbose: now_wall = time.time() if now_wall - last_status_time >= status_interval: elapsed = now_wall - start_time entity_count = len(scene.world.entities) print(f"[Server] Uptime: {elapsed:.0f}s | Ticks: {tick_count} | Entities: {entity_count}") last_status_time = now_wall # Sleep to avoid busy-waiting (target slightly under tick interval) sleep_time = fixed_dt - (time.perf_counter() - last_time) if sleep_time > 0.001: time.sleep(sleep_time * 0.9) # Shutdown print(f"[Server] Shutting down...") teardown_scene(scene) pygame.quit() elapsed = time.time() - start_time print(f"[Server] Server stopped after {elapsed:.1f}s ({tick_count} ticks)")
[docs] def main(): parser = argparse.ArgumentParser(description="AxisPy Engine - Headless Server") parser.add_argument("scene", nargs="?", default=None, help="Path to the .scn scene file") parser.add_argument("--tick-rate", type=float, default=60.0, help="Server tick rate in Hz (default: 60)") parser.add_argument("--verbose", "-v", action="store_true", help="Print periodic status info") args = parser.parse_args() if not args.scene: print("[Server] Error: No scene path provided.") print("[Server] Usage: python -m core.headless_server <scene_path>") sys.exit(1) run_headless(args.scene, tick_rate=args.tick_rate, verbose=args.verbose)
if __name__ == "__main__": main()