Source code for core.systems.steering_system

"""Steering Behaviors System — composable autonomous movement."""
from __future__ import annotations

import math
import random
from typing import List

from core.ecs import System, Entity
from core.vector import Vector2
from core.components.transform import Transform
from core.components.steering import (
    SteeringAgentComponent,
    SeekBehavior,
    FleeBehavior,
    ArriveBehavior,
    WanderBehavior,
    SeparationBehavior,
    CohesionBehavior,
    AlignmentBehavior,
)
from core.logger import get_logger

_steer_logger = get_logger("steering")


[docs] class SteeringSystem(System): """Evaluates all steering behaviour components on each entity that has a SteeringAgentComponent, sums the weighted forces, and applies the resulting velocity to the entity's Transform.""" required_components = (SteeringAgentComponent,) def __init__(self): super().__init__()
[docs] def update(self, dt: float, entities: List[Entity]): if not self.world or dt <= 0.0: return agents = self.world.get_entities_with(SteeringAgentComponent) if not agents: return # Pre-collect positions + velocities for flocking queries agent_data: list[tuple[Entity, Vector2, SteeringAgentComponent]] = [] for entity in agents: agent = entity.get_component(SteeringAgentComponent) t = entity.get_component(Transform) if agent and t: agent_data.append((entity, Vector2(t.x, t.y), agent)) for entity, pos, agent in agent_data: comps = entity.components total_force = Vector2.zero() # Seek seek = comps.get(SeekBehavior) if seek and seek.enabled: total_force += self._seek(pos, seek.target, agent.max_speed) * seek.weight # Flee flee = comps.get(FleeBehavior) if flee and flee.enabled: total_force += self._flee(pos, flee.target, agent.max_speed, flee.panic_distance) * flee.weight # Arrive arrive = comps.get(ArriveBehavior) if arrive and arrive.enabled: total_force += self._arrive(pos, arrive.target, agent.max_speed, arrive.slow_radius, agent._velocity) * arrive.weight # Wander wander = comps.get(WanderBehavior) if wander and wander.enabled: force, new_angle = self._wander( pos, agent._velocity, agent.max_speed, wander.circle_distance, wander.circle_radius, wander._wander_angle, wander.angle_change, ) wander._wander_angle = new_angle total_force += force * wander.weight # Separation sep = comps.get(SeparationBehavior) if sep and sep.enabled: total_force += self._separation(entity, pos, agent_data, sep.neighbor_radius) * sep.weight # Cohesion coh = comps.get(CohesionBehavior) if coh and coh.enabled: total_force += self._cohesion(entity, pos, agent_data, coh.neighbor_radius, agent.max_speed) * coh.weight # Alignment ali = comps.get(AlignmentBehavior) if ali and ali.enabled: total_force += self._alignment(entity, agent_data, ali.neighbor_radius) * ali.weight # Clamp total force if total_force.sqr_magnitude() > agent.max_force * agent.max_force: total_force = total_force.normalize() * agent.max_force # Apply force -> acceleration -> velocity accel = total_force * (1.0 / agent.mass) agent._velocity += accel * dt # Drag if agent.drag > 0.0: damping = max(0.0, 1.0 - agent.drag * dt) agent._velocity *= damping # Clamp speed spd_sq = agent._velocity.sqr_magnitude() if spd_sq > agent.max_speed * agent.max_speed: agent._velocity = agent._velocity.normalize() * agent.max_speed # Move transform t = entity.get_component(Transform) if t: t.x += agent._velocity.x * dt t.y += agent._velocity.y * dt
# -- Individual behaviour calculations ---------------------------------- @staticmethod def _seek(pos: Vector2, target: Vector2, max_speed: float) -> Vector2: desired = target - pos if desired.sqr_magnitude() < 0.0001: return Vector2.zero() return desired.normalize() * max_speed @staticmethod def _flee(pos: Vector2, target: Vector2, max_speed: float, panic_dist: float) -> Vector2: diff = pos - target dist_sq = diff.sqr_magnitude() if panic_dist > 0 and dist_sq > panic_dist * panic_dist: return Vector2.zero() if dist_sq < 0.0001: return Vector2(random.uniform(-1, 1), random.uniform(-1, 1)).normalize() * max_speed return diff.normalize() * max_speed @staticmethod def _arrive(pos: Vector2, target: Vector2, max_speed: float, slow_radius: float, current_vel: Vector2) -> Vector2: diff = target - pos dist = diff.magnitude() if dist < 0.5: return -current_vel if dist < slow_radius: desired_speed = max_speed * (dist / slow_radius) else: desired_speed = max_speed desired = diff.normalize() * desired_speed return desired - current_vel @staticmethod def _wander(pos: Vector2, velocity: Vector2, max_speed: float, circle_dist: float, circle_rad: float, wander_angle: float, angle_change: float): speed = velocity.magnitude() if speed < 0.001: heading = Vector2(1.0, 0.0) else: heading = velocity.normalize() circle_center = heading * circle_dist displacement = Vector2( math.cos(math.radians(wander_angle)) * circle_rad, math.sin(math.radians(wander_angle)) * circle_rad, ) new_angle = wander_angle + random.uniform(-angle_change, angle_change) force = circle_center + displacement return force, new_angle @staticmethod def _separation(self_entity: Entity, pos: Vector2, agent_data: list, neighbor_radius: float) -> Vector2: force = Vector2.zero() r_sq = neighbor_radius * neighbor_radius for entity, other_pos, _ in agent_data: if entity is self_entity: continue diff = pos - other_pos dist_sq = diff.sqr_magnitude() if 0.0 < dist_sq < r_sq: dist = math.sqrt(dist_sq) force += diff.normalize() * (1.0 / max(dist, 0.001)) return force @staticmethod def _cohesion(self_entity: Entity, pos: Vector2, agent_data: list, neighbor_radius: float, max_speed: float) -> Vector2: center = Vector2.zero() count = 0 r_sq = neighbor_radius * neighbor_radius for entity, other_pos, _ in agent_data: if entity is self_entity: continue if pos.distance_to_squared(other_pos) < r_sq: center += other_pos count += 1 if count == 0: return Vector2.zero() center *= (1.0 / count) desired = center - pos if desired.sqr_magnitude() < 0.0001: return Vector2.zero() return desired.normalize() * max_speed @staticmethod def _alignment(self_entity: Entity, agent_data: list, neighbor_radius: float) -> Vector2: avg_vel = Vector2.zero() count = 0 self_pos = None for entity, other_pos, agent in agent_data: if entity is self_entity: self_pos = other_pos break if self_pos is None: return Vector2.zero() r_sq = neighbor_radius * neighbor_radius for entity, other_pos, agent in agent_data: if entity is self_entity: continue if self_pos.distance_to_squared(other_pos) < r_sq: avg_vel += agent._velocity count += 1 if count == 0: return Vector2.zero() avg_vel *= (1.0 / count) return avg_vel