Source code for core.systems.particle_system

from __future__ import annotations
import array
import math
import random
from core.ecs import System, Entity
from core.components.transform import Transform
from core.components.particle_emitter import ParticleEmitterComponent


[docs] class ParticleSystem(System): required_components = (ParticleEmitterComponent,) def __init__(self): super().__init__() self._random = random.Random()
[docs] def update(self, dt: float, entities: list[Entity]): if self.world: target_entities = self.world.get_entities_with(Transform, ParticleEmitterComponent) else: target_entities = entities for entity in target_entities: transform = entity.get_component(Transform) emitter = entity.get_component(ParticleEmitterComponent) if not transform or not emitter: continue state = self._ensure_state(emitter) self._update_emitter(dt, transform, emitter, state)
def _ensure_state(self, emitter: ParticleEmitterComponent): if emitter._particle_state: return emitter._particle_state max_particles = max(1, int(emitter.max_particles)) _f = lambda n, v=0.0: array.array('f', [v] * n) _i = lambda n, v=0: array.array('i', [v] * n) emitter._particle_state = { "alive": 0, "max": max_particles, "rate_carry": 0.0, "burst_timer": 0.0, "elapsed": 0.0, "x": _f(max_particles), "y": _f(max_particles), "vx": _f(max_particles), "vy": _f(max_particles), "age": _f(max_particles), "life": _f(max_particles, 1.0), "size0": _f(max_particles, 1.0), "size1": _f(max_particles, 1.0), "r0": _i(max_particles, 255), "g0": _i(max_particles, 255), "b0": _i(max_particles, 255), "a0": _i(max_particles, 255), "r1": _i(max_particles, 255), "g1": _i(max_particles, 255), "b1": _i(max_particles, 255), "a1": _i(max_particles, 0), "angle": _f(max_particles), "ang_vel": _f(max_particles), } return emitter._particle_state def _update_emitter(self, dt: float, transform: Transform, emitter: ParticleEmitterComponent, state: dict): if state["max"] != emitter.max_particles: emitter._particle_state = None state = self._ensure_state(emitter) state["elapsed"] += dt if emitter.emitting and emitter.emitter_lifetime >= 0.0 and state["elapsed"] >= emitter.emitter_lifetime: emitter.emitting = False if emitter.emitting: spawn_count = 0 if emitter.emission_rate > 0.0: state["rate_carry"] += emitter.emission_rate * dt spawn_count += int(state["rate_carry"]) state["rate_carry"] -= int(state["rate_carry"]) if emitter.burst_count > 0 and not emitter.one_shot: state["burst_timer"] += dt while state["burst_timer"] >= emitter.burst_interval: state["burst_timer"] -= emitter.burst_interval spawn_count += emitter.burst_count if emitter._pending_bursts > 0 and emitter.burst_count > 0: spawn_count += emitter.burst_count * emitter._pending_bursts emitter._pending_bursts = 0 if spawn_count > 0: self._spawn_particles(spawn_count, transform, emitter, state) if emitter.one_shot and state["alive"] > 0 and emitter._pending_bursts == 0: emitter.emitting = False self._integrate_particles(dt, emitter, state) def _spawn_particles(self, spawn_count: int, transform: Transform, emitter: ParticleEmitterComponent, state: dict): alive = state["alive"] max_particles = state["max"] free_slots = max_particles - alive count = min(spawn_count, free_slots) if count <= 0: return for i in range(count): idx = alive + i speed = self._random.uniform(emitter.speed_min, emitter.speed_max) angle_deg = emitter.direction_degrees + self._random.uniform( -emitter.spread_degrees * 0.5, emitter.spread_degrees * 0.5 ) angle_rad = math.radians(angle_deg) radial_dist = self._random.uniform(emitter.radial_offset_min, emitter.radial_offset_max) radial_ang = self._random.uniform(0.0, 360.0) radial_rad = math.radians(radial_ang) offset_x = math.cos(radial_rad) * radial_dist offset_y = math.sin(radial_rad) * radial_dist if emitter.local_space: state["x"][idx] = offset_x state["y"][idx] = offset_y else: state["x"][idx] = transform.x + offset_x state["y"][idx] = transform.y + offset_y state["vx"][idx] = math.cos(angle_rad) * speed state["vy"][idx] = math.sin(angle_rad) * speed state["age"][idx] = 0.0 state["life"][idx] = self._random.uniform(emitter.lifetime_min, emitter.lifetime_max) state["size0"][idx] = self._random.uniform(emitter.start_size_min, emitter.start_size_max) state["size1"][idx] = self._random.uniform(emitter.end_size_min, emitter.end_size_max) state["r0"][idx] = int(emitter.start_color[0]) state["g0"][idx] = int(emitter.start_color[1]) state["b0"][idx] = int(emitter.start_color[2]) state["a0"][idx] = int(emitter.start_color[3]) state["r1"][idx] = int(emitter.end_color[0]) state["g1"][idx] = int(emitter.end_color[1]) state["b1"][idx] = int(emitter.end_color[2]) state["a1"][idx] = int(emitter.end_color[3]) state["angle"][idx] = self._random.uniform(0.0, 360.0) state["ang_vel"][idx] = self._random.uniform(emitter.angular_velocity_min, emitter.angular_velocity_max) state["alive"] += count def _integrate_particles(self, dt: float, emitter: ParticleEmitterComponent, state: dict): alive = state["alive"] if alive <= 0: return damping_factor = max(0.0, 1.0 - (emitter.damping * dt)) i = alive - 1 while i >= 0: state["age"][i] += dt if state["age"][i] >= state["life"][i]: alive = self._remove_index(i, state, alive) i -= 1 continue state["vx"][i] *= damping_factor state["vy"][i] *= damping_factor state["vx"][i] += emitter.gravity_x * dt state["vy"][i] += emitter.gravity_y * dt state["x"][i] += state["vx"][i] * dt state["y"][i] += state["vy"][i] * dt state["angle"][i] += state["ang_vel"][i] * dt i -= 1 state["alive"] = alive def _remove_index(self, idx: int, state: dict, alive: int): last = alive - 1 if idx != last: for field in ( "x", "y", "vx", "vy", "age", "life", "size0", "size1", "r0", "g0", "b0", "a0", "r1", "g1", "b1", "a1", "angle", "ang_vel" ): state[field][idx] = state[field][last] return last