Source code for xecs._internal.real_time_app

import inspect
import typing
from collections import abc
from time import sleep
from typing import Any

from xecs._internal.commands import Commands
from xecs._internal.component import (
    Component,
    ComponentPool,
    ComponentT,
    MissingPoolError,
)
from xecs._internal.entity_id import EntityId
from xecs._internal.events import EventReader, Events, EventWriter
from xecs._internal.input import Keyboard, Mouse
from xecs._internal.query import Query
from xecs._internal.resource import Resource
from xecs._internal.systems import (
    FixedTimeStepSystems,
    FixedTimeStepSystemSpec,
    OtherParameter,
    PendingStartupSystems,
    PendingSystems,
    StartupSystems,
    System,
    SystemParameter,
    Systems,
    SystemSignatureError,
    SystemSpec,
)
from xecs._internal.time import Time
from xecs._internal.world import World
from xecs.xecs import Duration, Instant, RustApp

P = typing.ParamSpec("P")
R = typing.TypeVar("R")


[docs] class RealTimeAppPlugin: """ A base class for plugins for :class:`.RealTimeApp`. """
[docs] def build(self, app: "RealTimeApp") -> None: """ Add the plugin to an `app`. Parameters: app: The app to add the plugin to. """ pass
[docs] class RealTimeApp: """ An app which runs in real time. Parameters: num_entities: The maximum number of entities which can be spawned. """ def __init__(self, num_entities: int) -> None: self.world = World() self.add_resource(PendingStartupSystems([])) self.add_resource(StartupSystems([])) self.add_resource(PendingSystems([])) self.add_resource(Systems([])) self.add_resource(FixedTimeStepSystems([])) self.add_resource(Events({})) self.add_resource(Mouse(set(), (0, 0))) self.add_resource(Keyboard(set())) self._rust_app = RustApp( num_pools=len(Component.component_ids), num_queries=Query.p_num_queries, ) self._commands = Commands.p_new(self._rust_app, self.world) self._has_run_startup_systems = False self.add_pool(EntityId.create_pool(num_entities))
[docs] def add_plugin(self, plugin: RealTimeAppPlugin) -> None: """ Add a plugin. Parameters: plugin: The plugin. """ plugin.build(self)
[docs] def add_resource(self, resource: Resource) -> None: """ Add a resource. Parameters: resource: The resource. """ self.world.add_resource(resource)
[docs] def add_startup_system(self, system: System) -> None: """ Add a startup system. Parameters: system: The startup system. """ self.world.get_resource(PendingStartupSystems).systems.append(system)
[docs] def add_system( self, system: System, run_condition: Duration | None = None, ) -> None: """ Add a system. Parameters: system: The system. run_condition: The time step between runs of the system. If ``None`` the system will run every frame. """ self.world.get_resource(PendingSystems).systems.append( (system, run_condition) )
def _get_system_args( self, system: abc.Callable[P, R], ) -> tuple[ dict[str, Query[Any]], dict[str, EventReader[Any]], dict[str, OtherParameter], ]: query_args: dict[str, Query[Any]] = {} reader_args: dict[str, EventReader[Any]] = {} other_args: dict[str, OtherParameter] = {} for name, parameter in inspect.signature(system).parameters.items(): origin = typing.get_origin(parameter.annotation) if origin is Query: (component_tuple,) = typing.get_args(parameter.annotation) if issubclass(component_tuple, Component): query_id = self._rust_app.add_query( first_component=Component.component_ids[ component_tuple ], other_components=[], ) query_args[name] = Query.p_new( str(parameter.annotation), query_id, [component_tuple], False, ) else: components = typing.get_args(component_tuple) component_ids = [ Component.component_ids[component] for component in components ] query_id = self._rust_app.add_query( first_component=component_ids[0], other_components=component_ids[1:], ) query_args[name] = Query.p_new( str(parameter.annotation), query_id, components, True, ) elif origin is EventReader: (event_type,) = typing.get_args(parameter.annotation) reader: EventReader[Any] = EventReader() reader_args[name] = reader events = self.world.get_resource(Events) if event_type not in events.writers: events.writers[event_type] = EventWriter() writer = events.writers[event_type] writer.p_readers.append(reader) elif origin is EventWriter: (event_type,) = typing.get_args(parameter.annotation) events = self.world.get_resource(Events) if event_type not in events.writers: events.writers[event_type] = EventWriter() other_args[name] = events.writers[event_type] elif parameter.annotation is Commands: other_args[name] = self._commands elif parameter.annotation is World: other_args[name] = self.world elif issubclass(parameter.annotation, Resource): other_args[name] = self.world.get_resource( parameter.annotation ) else: expected_type = " | ".join( arg.__name__ for arg in typing.get_args(SystemParameter) ) raise SystemSignatureError( f'annotation of parameter "{name}" in ' f'"{system.__name__}" is "{parameter.annotation}" ' "but needs to be " f"{expected_type}" ) return query_args, reader_args, other_args def _run_query(self, query: Query[Any]) -> None: self._assert_has_pools(query.p_components) component_indices = self._rust_app.run_query(query.p_query_id) query.p_result = tuple( pool.p_component.p_new_view_with_indices(indices) for pool, indices in zip( ( self.world.p_get_pool(component) for component in query.p_components ), iter(lambda: component_indices.next(), None), strict=True, ) ) if not query.p_tuple_query: query.p_result = query.p_result[0] def _assert_has_pools( self, components: abc.Iterable[type[Component]], ) -> None: for component in components: if not self.world.has_pool(component): error = MissingPoolError( f"missing a pool for the {component.__name__} component" ) error.add_note("Did you forget to run app.add_pool(...)?") raise error def _process_pending_startup_systems(self) -> None: pending_startup_systems = self.world.get_resource( PendingStartupSystems ) startup_systems = self.world.get_resource(StartupSystems) for system in pending_startup_systems.systems: query_args, reader_args, other_args = self._get_system_args(system) startup_systems.systems.append( SystemSpec( function=system, query_args=query_args, reader_args=reader_args, other_args=other_args, ) ) pending_startup_systems.systems = [] def _process_pending_systems(self) -> None: pending_systems = self.world.get_resource(PendingSystems) systems = self.world.get_resource(Systems) fixed_time_step_systems = self.world.get_resource(FixedTimeStepSystems) for system, run_condition in pending_systems.systems: query_args, reader_args, other_args = self._get_system_args(system) match run_condition: case Duration(): fixed_time_step_systems.systems.append( FixedTimeStepSystemSpec( system, query_args, reader_args, other_args, run_condition, ) ) case None: systems.systems.append( SystemSpec(system, query_args, reader_args, other_args) ) pending_systems.systems = [] def _run_startup_systems(self) -> None: self._has_run_startup_systems = True for system in self.world.get_resource(StartupSystems).systems: for query in system.query_args.values(): self._run_query(query) system.function( **system.query_args, **system.reader_args, **system.other_args, ) for reader in system.reader_args.values(): reader.events.clear() def _run_systems(self) -> None: for system in self.world.get_resource(Systems).systems: for query in system.query_args.values(): self._run_query(query) system.function( **system.query_args, **system.reader_args, **system.other_args, ) for reader in system.reader_args.values(): reader.events.clear() def _run_fixed_time_step_systems( self, time_since_last_update: Duration, ) -> None: for system in self.world.get_resource(FixedTimeStepSystems).systems: system.time_to_simulate += time_since_last_update while system.time_to_simulate >= system.time_step: for query in system.query_args.values(): self._run_query(query) system.function( **system.query_args, **system.reader_args, **system.other_args, ) system.time_to_simulate -= system.time_step for reader in system.reader_args.values(): reader.events.clear()
[docs] def update(self) -> None: """ Run the app for a single step. """ if not self.world.has_resource(Time): self.world.add_resource(Time.default()) if not self._has_run_startup_systems: self._process_pending_startup_systems() self._run_startup_systems() self._process_pending_systems() self._update()
def _update(self) -> None: time = self.world.get_resource(Time) time.update() self._run_systems() self._run_fixed_time_step_systems(time.delta())
[docs] def run( self, frame_time: Duration = Duration.from_nanos(int(1e9 / 60)), max_run_time: Duration | None = None, ) -> None: """ Run the app continuously. Parameters: frame_time: The time between frames. max_run_time: The maximum time to run the app for. """ if not self.world.has_resource(Time): self.world.add_resource(Time.default()) if not self._has_run_startup_systems: self._process_pending_startup_systems() self._run_startup_systems() self._process_pending_systems() self._run(frame_time, max_run_time)
def _run( self, frame_time: Duration, max_run_time: Duration | None, ) -> None: time = self.world.get_resource(Time) while True: start = Instant.now() self._update() if max_run_time is not None and time.elapsed() >= max_run_time: break sleep_time = frame_time.saturating_sub(start.elapsed()) sleep(sleep_time.as_nanos() / 1e9)
[docs] def add_pool(self, pool: ComponentPool[ComponentT]) -> None: """ Add a preallocated pool of components. The `pool` will be used to hold any components which are spawned during runtime. Parameters: pool: The component pool. """ component_id = Component.component_ids[type(pool.p_component)] self._rust_app.add_pool(component_id, pool.p_capacity) self.world.add_pool(pool)