Source code for simfantasy.simulator

# -*- coding: utf-8 -*-
"""Contains the primary classes and code required for running and managing simulations."""

import logging
import queue
import re
from datetime import datetime, timedelta
from typing import List, Optional, Pattern, TYPE_CHECKING, Tuple

import humanfriendly
import pandas as pd

from simfantasy.reporting import TerminalReporter

    from import Actor
    from simfantasy.event import Event

[docs]def configure_logging(log_level: int = None) -> None: """Set logging options. Arguments: log_level (Optional[int]): The minimum priority level a message needs to be shown. """ if log_level is None: log_level = logging.INFO logging.basicConfig( level=log_level, format='[%(levelname)s] %(message)s (%(name)s,%(lineno)d)', )
LOGGER = logging.getLogger(__name__)
[docs]class Simulation: """Business logic for managing the simulated combat encounter and subsequent reporting. Arguments: combat_length (Optional[datetime.timedelta]): Desired combat length. Default: 5 minutes. log_level (Optional[int]): Minimum message level necessary to see logger output. Default: :obj:`logging.INFO`. log_event_filter (Optional[str]): Pattern for filtering logging output to only matching class names. Default: None. execute_time (Optional[datetime.timedelta]): Length of time to allow jobs to use "execute" actions. Default: 1 minute. log_pushes (Optional[bool]): True to show events being placed on the queue. Default: True. log_pops (Optional[bool]): True to show events being popped off the queue. Default: True. iterations (Optional[int]): Number of encounters to simulate. Default: 100. log_action_attempts (Optional[bool]): True to log actions attempted by :class:`` decision engines. Attributes: actors (List[]): Actors involved in the encounter. combat_length (datetime.timedelta): Length of the encounter. current_iteration (int): Current iteration index. current_time (datetime.datetime): "In game" timestamp. events (queue.PriorityQueue[simfantasy.event.Event]): Heapified list of upcoming events. execute_time (datetime.timedelta): Length of time to allow jobs to use "execute" actions. iterations (int): Number of encounters to simulate. Default: 100. log_action_attempts (bool): True to log actions attempted by :class:`` decision engines. log_event_filter (Optional[Pattern]): Pattern for filtering logging output to only matching class names. log_pops (bool): True to show events being popped off the queue. Default: True. log_pushes (bool): True to show events being placed on the queue. Default: True. start_time (datetime.datetime): Time that combat started. """ def __init__(self, combat_length: timedelta = None, log_level: int = None, log_event_filter: str = None, execute_time: timedelta = None, log_pushes: bool = None, log_pops: bool = None, iterations: int = None, log_action_attempts: bool = None) -> None: # FIXME Do I even need to set these here? They aren't mutable. if combat_length is None: combat_length = timedelta(minutes=5) self.combat_length: timedelta = combat_length if execute_time is None: execute_time = timedelta(seconds=60) self.execute_time: timedelta = execute_time if iterations is None: iterations = 100 self.iterations: int = iterations self.log_event_filter: Optional[Pattern] = None if log_event_filter is not None: self.log_event_filter = re.compile(log_event_filter) if log_pushes is None: log_pushes = True self.log_pushes: bool = log_pushes if log_pops is None: log_pops = True self.log_pops: bool = log_pops if log_action_attempts is None: log_action_attempts = False self.log_action_attempts: bool = log_action_attempts configure_logging(log_level) self.actors: List[Actor] = [] self.current_iteration: int = 0 self.start_time: datetime = self.current_time: datetime = queue.PriorityQueue[ # pylint: disable=unsubscriptable-object Tuple[datetime, datetime, Event] ] = queue.PriorityQueue() @property def in_execute(self) -> bool: """Indicate whether the encounter is currently in an "execute" phase. "Execute" phases are usually when an enemy falls below a certain health percentage, allowing actions such as :class:`` to be used. Returns: bool: True if the encounter is in an execute phase, False otherwise. Examples: A fresh simulation that has just started a moment ago: >>> sim = Simulation( ... combat_length=timedelta(seconds=60), ... execute_time=timedelta(seconds=30) ... ) >>> sim.start_time = sim.current_time = >>> print("Misery's End") if sim.in_execute else print('Heavy Shot') Heavy Shot And now, if we adjust the start time to force us halfway into the execute phase: >>> sim.current_time += timedelta(seconds=45) >>> print("Misery's End") if sim.in_execute else print('Heavy Shot') Misery's End """ return self.current_time + self.execute_time >= self.start_time + self.combat_length
[docs] def unschedule(self, event) -> bool: """Unschedule an event, ensuring that it is not executed. Does not "remove" the event. In actuality, flags the event itself as unscheduled to prevent having to resort the events list and subsequently recalculate the heap invariant. Arguments: event (simfantasy.event.Event): The event to unschedule. Returns: bool: True if the event was unscheduled without issue. False if an error occurred, specifically a desync bug between the game clock and the event loop. Examples: .. testsetup:: >>> from simfantasy.event import Event >>> sim = Simulation() >>> sim.start_time = sim.current_time = >>> class MyEvent(Event): ... def execute(self): ... pass >>> event = MyEvent(sim) Unscheduling an upcoming event: >>> sim.schedule(event) >>> event.unscheduled False >>> sim.unschedule(event) True >>> event.unscheduled True Rescheduling a previously-unscheduled event will reset its unscheduled flag: >>> event.unscheduled True >>> sim.schedule(event) >>> event.unscheduled False Unscheduling an event that has already occurred will fail: >>> sim.schedule(event, timedelta(seconds=-30)) >>> event.timestamp < sim.current_time True >>> sim.unschedule(event) False >>> event.unscheduled False """ if event.timestamp < self.current_time: # Some event desync clearly happened. LOGGER.warning('[%s] %s Wanted to unschedule event past event %s at %s', self.current_iteration, self.relative_timestamp, event, format(abs(event.timestamp - self.start_time).total_seconds(), '.3f')) return False if self.log_event_filter is None or self.log_event_filter.match( event.__class__.__name__) is not None: LOGGER.debug('[%s] XX %s %s', self.current_iteration, format(abs(event.timestamp - self.start_time).total_seconds(), '.3f'), event) event.unscheduled = True return True
[docs] def schedule(self, event, delta: timedelta = None) -> None: """Schedule an event to occur in the future. Arguments: event (simfantasy.event.Event): The event to schedule. delta (Optional[datetime.timedelta]): An optional amount of time to wait before the event should be executed. When delta is None, the event will be scheduled for the current timestamp, and executed after any preexisting events already scheduled for the current timestamp are finished. Examples: .. testsetup:: >>> from simfantasy.event import Event >>> sim = Simulation() >>> sim.start_time = sim.current_time = >>> class MyEvent(Event): ... def execute(self): ... pass >>> event = MyEvent(sim) >>> event.timestamp is None True >>> sim.schedule(event) >>> event.timestamp is sim.current_time True """ if delta is None: event.timestamp = self.current_time else: event.timestamp = self.current_time + delta,, event)) event.unscheduled = False if self.log_pushes is True: if self.log_event_filter is None or self.log_event_filter.match( event.__class__.__name__) is not None: LOGGER.debug('[%s] => %s %s', self.current_iteration, format(abs(event.timestamp - self.start_time).total_seconds(), '.3f'), event)
[docs] def run(self) -> None: """Run the simulation and process all events.""" from simfantasy.event import ActorReadyEvent, CombatStartEvent, CombatEndEvent, \ ServerTickEvent auras_df = pd.DataFrame() damage_df = pd.DataFrame() resources_df = pd.DataFrame() try: # Create a friendly progress indicator for the user. with humanfriendly.Spinner(label='Simulating', total=self.iterations) as spinner: # Store iteration runtimes so we can predict overall runtime. iteration_runtimes: List[timedelta] = [] for iteration in range(self.iterations): pd_runtimes = pd.Series(iteration_runtimes) iteration_start = self.current_iteration = iteration # Schedule the bookend events. self.schedule(CombatStartEvent(sim=self)) self.schedule(CombatEndEvent(sim=self), self.combat_length) # Schedule the server ticks. for delta in range(3, int(self.combat_length.total_seconds()), 3): self.schedule(ServerTickEvent(sim=self), delta=timedelta(seconds=delta)) # TODO Maybe move this to Actor#arise? # Tell the actors to get ready. for actor in self.actors: self.schedule(ActorReadyEvent(sim=self, actor=actor)) # Start the event loop. while not _, _, event = # Ignore events that are flagged as unscheduled. if event.unscheduled is True: continue # Some event desync clearly happened. if event.timestamp < self.current_time: LOGGER.critical( '[%s] %s %s timestamp %s before current timestamp', self.current_iteration, self.relative_timestamp, event, (event.timestamp - self.start_time).total_seconds() ) # Update the simulation's current time to the latest event. self.current_time = event.timestamp if self.log_pops is True: if self.log_event_filter is None or self.log_event_filter.match( event.__class__.__name__) is not None: LOGGER.debug( '[%s] <= %s %s', self.current_iteration, format( abs(event.timestamp - self.start_time).total_seconds(), '.3f' ), event ) # Handle the event. event.execute() if not # type: ignore # Build statistical dataframes for the completed iteration. for actor in self.actors: auras_df = auras_df.append(pd.DataFrame(actor.statistics['auras'])) damage_df = damage_df.append(pd.DataFrame(actor.statistics['damage'])) resources_df = resources_df.append( pd.DataFrame(actor.statistics['resources'])) # Add the iteration runtime to the collection. iteration_runtimes.append( - iteration_start) auras_df = auras_df.astype(dtype={ 'aura': 'category', 'target': 'category', }) damage_df = damage_df.astype(dtype={ 'action': 'category', 'source': 'category', 'target': 'category', }) # Update our fancy progress indicator with the runtime estimation. spinner.label = 'Simulating ({0})'.format( (pd_runtimes.mean() * (self.iterations - self.current_iteration))) spinner.step(iteration)'Finished %s iterations in %s (mean %s).\n', self.iterations, pd_runtimes.sum(), pd_runtimes.mean()) except KeyboardInterrupt: # Handle SIGINT. LOGGER.critical('Interrupted at %s / %s iterations after %s.\n', self.current_iteration, self.iterations, pd_runtimes.sum()) # TODO Everything. auras_df.set_index('iteration', inplace=True) damage_df.set_index('iteration', inplace=True) resources_df.set_index('iteration', inplace=True) TerminalReporter(self, auras=auras_df, damage=damage_df, resources=resources_df).report() # HTMLReporter(self, df).report()'Quitting!')
@property def relative_timestamp(self) -> str: """Return a formatted string containing the number of seconds since the simulation began. Returns: str: A string, with precision to the thousandths. Examples: .. testsetup:: >>> sim = Simulation() >>> sim.start_time = For a simulation that has been running for 5 minutes (300 seconds): >>> sim.current_time = sim.start_time + timedelta(minutes=5) >>> sim.relative_timestamp '300.000' And in another 30 seconds: >>> sim.current_time += timedelta(seconds=30) >>> sim.relative_timestamp '330.000' """ return format((self.current_time - self.start_time).total_seconds(), '.3f')