Source code for xecs._internal.events
from typing import Any, Generic, TypeVar
from xecs._internal.resource import Resource
T = TypeVar("T")
[docs]
class EventReader(Generic[T]):
"""
Gives access to events of type ``T``.
"""
__slots__ = ("events",)
events: list[T]
"""The events sent since the last time the system was called."""
def __init__(self) -> None:
self.events = []
[docs]
class EventWriter(Generic[T]):
"""
Sends events of type ``T``.
"""
p_readers: list[EventReader[T]]
def __init__(self) -> None:
self.p_readers = []
[docs]
def send(self, event: T) -> None:
"""
Send an event which can be read by :class:`.EventReader`.
Parameters:
event: The event to send.
"""
for reader in self.p_readers:
reader.events.append(event)
class Events(Resource):
writers: dict[type, EventWriter[Any]]