from dataclasses import replace
from typing import List, Tuple
from plum import dispatch
from domain.games.tic_tac_toe.board import TicTacToeBoard
from domain.games.tic_tac_toe.commands import CreateGame, JoinGame, PlaceMark
from domain.games.tic_tac_toe.events import BoardUpdated, GameCreated, GameEnded, GameErrorOccurred, GameStarted, \
GameStateReadyToBeCleaned, MarkPlaced, TurnTimeout, WaitingForPlayerPlay
from domain.games.tic_tac_toe.game import GameInProgress, GameWaitingForPlayers
from domain.games.tic_tac_toe.game_repository import ByGameId
from domain.games.tic_tac_toe.types import GameErrorReasons, GameStage
from domain.operation_id import OperationId
from domain.users.events import PlayerJoinedAGame
from scuti.domain.cqrs.bus.effect_handler import ManagedStateEffectHandler
from scuti.domain.cqrs.bus.state_management.commands import DeleteState
from scuti.domain.cqrs.bus.state_management.effect_to_state_mapping import state_fetcher
from scuti.domain.cqrs.bus.state_management.evolve import evolve
from scuti.domain.cqrs.effects import Effect
from scuti.domain.cqrs.event_scheduler.commands import CancelScheduledEvents, ScheduleEvent
from scuti.domain.time.units import Millisecond
This is an “effect handler”. Its mission is to receive effects and calculate state changes and create derived effects. These effect handlers can be used to: - Tie an entity to the busses, so it can communicate with the outside world - As a saga to model a real world procedure - Just as a stateless command / event handler - As a projection that adapts internal data model to the client needs
class TicTacToeGame(ManagedStateEffectHandler):
turn_timeout = Millisecond(20000)
Create one handle
method for every effect type you want to handle. In this case the effect CreateGame
is
a creational effect, so it creates a new entity so there is no previous state.
You will receive the event in this method parameters and Scuti expects you to return a tuple containing next
state and a list of effects that have been generated as CreateGame
consequences.
@dispatch
def handle(self, command: CreateGame):
next_state = GameWaitingForPlayers(id=command.game_id)
return next_state, [
GameCreated(game_id=next_state.id,
creator=command.creator,
stage=GameStage.WAITING_FOR_PLAYERS,
parent_operation_id=command.operation_id)]
When an entity has already been created Scuti needs some way to decide how current state should be retrieved
based on the effect being handled. This is the mission of state_fetcher
annotation.
Creating state fetchers requires creating a function that will receive the current effect being handled,
the repo associated to this effect handler and it is expected to return current state for this entity.
So something like:
ByGameId = lambda eff, repo: repo.by_id(eff.game_id)
When there is a state fetcher handle
signature changes to receive state as a first parameter.
In this case we’re using several state types to represent the game stages. To change the type of the state
use the helper function evolve
. With these types you can have specific effect handlers for a given stage
of the entity.
See: domain/games/tic_tac_toe/game.py
@dispatch
@state_fetcher(ByGameId)
def handle(self, state: GameWaitingForPlayers, effect: JoinGame) -> Tuple[GameWaitingForPlayers | GameInProgress,
List[Effect]]:
number_of_players = len(state.players)
if number_of_players == 0:
next_state = replace(state, players=[*state.players, effect.player_id])
return next_state, [
PlayerJoinedAGame(game_id=next_state.id, player_id=effect.player_id,
parent_operation_id=effect.operation_id)]
elif number_of_players == 1:
next_state = evolve(state, GameInProgress,
players=[*state.players, effect.player_id],
board=TicTacToeBoard(),
stage=GameStage.IN_PROGRESS,
waiting_for_player=state.players[0])
return next_state, [
PlayerJoinedAGame(game_id=next_state.id, player_id=effect.player_id,
parent_operation_id=effect.operation_id),
GameStarted(game_id=next_state.id, players=next_state.players, board=next_state.board.to_list()),
*self._next_turn(next_state)
]
else:
return state, [
GameErrorOccurred(reason=GameErrorReasons.ALL_PLAYERS_ALREADY_JOINED,
parent_operation_id=effect.operation_id,
player=effect.player_id, game_id=effect.game_id)]
@dispatch
@state_fetcher(ByGameId)
def handle(self, state: GameInProgress, command: PlaceMark) -> Tuple[GameInProgress, List[Effect]]:
error_effects = []
final_effects = [CancelScheduledEvents(operation_id=OperationId(), key=str(state.id))]
if state.waiting_for_player != command.player:
error_effects += [GameErrorOccurred(reason=GameErrorReasons.PLAYER_CAN_NOT_PLAY,
player=command.player,
game_id=state.id,
parent_operation_id=command.operation_id)]
elif state.board.is_off_limits(command.x, command.y):
error_effects += [GameErrorOccurred(reason=GameErrorReasons.POSITION_OFF_LIMITS,
player=command.player,
game_id=state.id,
parent_operation_id=command.operation_id
)]
elif not state.board.is_cell_free(command.x, command.y):
error_effects += [GameErrorOccurred(reason=GameErrorReasons.POSITION_ALREADY_FILLED,
player=command.player,
game_id=state.id,
parent_operation_id=command.operation_id)]
elif state.stage != GameStage.IN_PROGRESS:
error_effects += [GameErrorOccurred(reason=GameErrorReasons.GAME_ALREADY_ENDED,
player=command.player,
game_id=state.id,
parent_operation_id=command.operation_id)]
if error_effects:
return state, error_effects + final_effects
next_state = state.place(command.player, command.x, command.y)
stage = next_state.stage
if stage == GameStage.IN_PROGRESS:
You can use functions to create you list of effects
next_effects = self._next_turn(next_state)
else:
next_effects = [GameEnded(game_id=next_state.id,
result=stage,
winner=next_state.winner)]
return next_state, [
*final_effects,
MarkPlaced(game_id=next_state.id, player=command.player, x=command.x, y=command.y,
parent_operation_id=command.operation_id),
BoardUpdated(game_id=next_state.id, board=next_state.board.to_list()),
*next_effects,
]
You can listen to your own events. Usually in these cases it is more efficient to extract a function that creates all derived events/commands.
@dispatch
@state_fetcher(ByGameId)
def handle(self, state: GameInProgress, command: TurnTimeout) -> Tuple[GameInProgress, List[Effect]]:
next_state = state.cancel_game()
return next_state, [GameEnded(game_id=next_state.id,
result=next_state.stage,
winner=next_state.winner)]
@dispatch
@state_fetcher(ByGameId)
def handle(self, state: GameInProgress, event: GameEnded) -> Tuple[GameInProgress, List[Effect]]:
return state, [ScheduleEvent(GameStateReadyToBeCleaned(game_id=event.game_id),
when=Millisecond(10000), key=str(event.game_id),
operation_id=OperationId())]
You can delete states by issuing a DeleteState
command
@dispatch
@state_fetcher(ByGameId)
def handle(self, state: GameInProgress, event: GameStateReadyToBeCleaned) -> Tuple[None, List[Effect]]:
return None, [DeleteState(state.id)]
def _next_turn(self, state: GameInProgress):
return [
WaitingForPlayerPlay(game_id=state.id, player_id=state.waiting_for_player, timeout=self.turn_timeout),
You can schedule events for the future
ScheduleEvent(TurnTimeout(game_id=state.id, player_id=state.waiting_for_player),
operation_id=OperationId(), key=str(state.id), when=self.turn_timeout)
]