"""Finite State Machine for entity behavior management.
Usage from a user script::
from core.state_machine import StateMachine, State
class IdleState(State):
def on_enter(self):
print("Entering idle")
def on_update(self, dt):
if some_condition:
self.machine.transition_to("walk")
def on_exit(self):
print("Leaving idle")
class WalkState(State):
def on_enter(self):
print("Starting to walk")
def on_update(self, dt):
if another_condition:
self.machine.transition_to("idle")
def on_exit(self):
print("Stopped walking")
class MyScript:
def on_start(self):
self.state_machine = StateMachine(self.entity)
self.state_machine.add_state("idle", IdleState())
self.state_machine.add_state("walk", WalkState())
self.state_machine.start("idle")
def on_update(self, dt):
self.state_machine.update(dt)
"""
from __future__ import annotations
from core.logger import get_logger
_fsm_logger = get_logger("state_machine")
[docs]
class State:
"""Base class for FSM states. Override the lifecycle methods as needed."""
def __init__(self):
self.machine: StateMachine | None = None
self.entity = None
[docs]
def on_enter(self):
"""Called when this state becomes active."""
pass
[docs]
def on_update(self, dt: float):
"""Called every frame while this state is active."""
pass
[docs]
def on_exit(self):
"""Called when leaving this state."""
pass
[docs]
class StateMachine:
"""Simple finite state machine bound to an entity.
States are registered by name and transitions are triggered explicitly
via ``transition_to(name)``.
"""
def __init__(self, entity=None):
self.entity = entity
self._states: dict[str, State] = {}
self._current_state: State | None = None
self._current_name: str = ""
self._previous_name: str = ""
@property
def current_state(self) -> str:
"""Name of the currently active state, or empty string."""
return self._current_name
@property
def previous_state(self) -> str:
"""Name of the previously active state."""
return self._previous_name
[docs]
def add_state(self, name: str, state: State) -> None:
"""Register a state under *name*."""
state.machine = self
state.entity = self.entity
self._states[name] = state
[docs]
def remove_state(self, name: str) -> None:
"""Remove a registered state. If it is the current state, exit it first."""
if name == self._current_name and self._current_state is not None:
try:
self._current_state.on_exit()
except Exception as e:
_fsm_logger.error("Error in State.on_exit during remove", state=name, error=str(e))
self._current_state = None
self._current_name = ""
self._states.pop(name, None)
[docs]
def has_state(self, name: str) -> bool:
return name in self._states
[docs]
def start(self, name: str) -> None:
"""Set the initial state without calling on_exit on any previous state."""
state = self._states.get(name)
if state is None:
_fsm_logger.warning("State not found", state=name)
return
self._current_state = state
self._current_name = name
try:
state.on_enter()
except Exception as e:
_fsm_logger.error("Error in State.on_enter", state=name, error=str(e))
[docs]
def transition_to(self, name: str) -> None:
"""Transition from the current state to *name*."""
if name == self._current_name:
return
new_state = self._states.get(name)
if new_state is None:
_fsm_logger.warning("State not found for transition", target=name)
return
# Exit current
if self._current_state is not None:
try:
self._current_state.on_exit()
except Exception as e:
_fsm_logger.error("Error in State.on_exit", state=self._current_name, error=str(e))
self._previous_name = self._current_name
self._current_state = new_state
self._current_name = name
# Enter new
try:
new_state.on_enter()
except Exception as e:
_fsm_logger.error("Error in State.on_enter", state=name, error=str(e))
[docs]
def update(self, dt: float) -> None:
"""Tick the current state. Call once per frame."""
if self._current_state is not None:
try:
self._current_state.on_update(dt)
except Exception as e:
_fsm_logger.error("Error in State.on_update", state=self._current_name, error=str(e))