cqrs_api_app.py

#
import logging
import signal
import sys
from threading import Thread
from typing import List, Type

import flask_injector
import socketio
from flask import Flask
from flask_compress import Compress
from flask_cors import CORS

from applications.api.controllers import command_controller, event_controller, query_controller
from applications.api.tools import from_javascript
from applications.api.websockets.create_socket_io_app import create_socketio_app
from applications.api.websockets.socket_io_emitter import EventToSocketIOBridge
from scuti.domain.cqrs.bus.command_bus import CommandBus
from scuti.domain.cqrs.bus.event_bus import EventBus
from scuti.domain.cqrs.bus.query_bus import QueryBus
from scuti.domain.cqrs.effects import Command, Event, Query
from scuti.domain.model.application.application_error import ApplicationError
from scuti.domain.model.application.domain_application import DomainApplication
from scuti.domain.model.application.net_config import NetConfig
from scuti.domain.model.modules import DomainModule
from scuti.infrastructure.domain.cqrs.bus.build_effect_handlers.asynchronous_class import \
    build_asynchronous_class_effect_handler
from scuti.infrastructure.logging.get_logger import get_logger
from scuti.infrastructure.serialization.from_untyped_dict import from_untyped_dict
from scuti.infrastructure.tools.string import snake_to_upper_camel

logger = get_logger(__name__)
#

This class allows the user to fully customize how the domain model is wired to user infrastructure. It’s responsibility to Scuti are: - Creating the DomainApplication - provide required configuration - Notify start or stop events - Feed effects - Emit effects

class CQRSAPIApp:
#

This is de domain application. Offers a minimal api consumed by your application that enables accepting effects and emitting events to the outside.

    def __init__(self, domains: List[Type[DomainModule]], config: NetConfig,
                 accepted_commands: List[Type[Command]] = None,
                 events_to_publish: List[Type[Event]] = None,
                 accepted_events: List[Type[Event]] = None,
                 accepted_queries: List[Type[Query]] = None):
        self._available_commands = {command_type.__name__: command_type for command_type in accepted_commands or []}
        self._available_events = {event_type.__name__: event_type for event_type in accepted_events or []}
        self._available_queries = {query_type.__name__: query_type for query_type in accepted_queries or []}
        self._events_to_publish = events_to_publish or []
        self._thread_instances: List[Thread] = []
        self._config = config
#
        self._domain_app = DomainApplication(domains=domains, config=config.__dict__)
#

Let’s die with some dignity

        def signal_handler(sig, frame):
            logger.info("Stop requested")
            self._domain_app.stop()
            sys.exit(0)

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGHUP, signal_handler)
#

Boring Flask stuff

        get_logger("engineio").setLevel(logging.ERROR)
        get_logger("werkzeug").setLevel(logging.ERROR)
        self._api_app = Flask(__name__)
        self._api_app.config["SECRET_KEY"] = "VeryS3cret1275"
        CORS(self._api_app, resources={r"/*": {"origins": "*"}})
        Compress(self._api_app)
#

The injector is the core of our domain model. Holds all dependencies and enables building all required objects

        injector = self._domain_app.injector()
        flask_injector.FlaskInjector(app=self._api_app, injector=injector)
#

There are standard Flask controllers used to receive effects from other systems. See controllers.py

        self._api_app.add_url_rule("/commands",
                                   view_func=command_controller(injector.get(CommandBus), self._available_commands),
                                   provide_automatic_options=None,
                                   methods=["POST"])
        self._api_app.add_url_rule("/queries",
                                   view_func=query_controller(injector.get(QueryBus), self._available_queries),
                                   provide_automatic_options=None,
                                   methods=["POST"])
        self._api_app.add_url_rule("/events",
                                   view_func=event_controller(injector.get(EventBus), self._available_events),
                                   provide_automatic_options=None,
                                   methods=["POST"])
#

Have a fallback error manager, this should never be called as Scuti captures all exceptions

        self._api_app.register_error_handler(Exception, self.__handle_internal_error)
#

Configure socket.io server

        self._socketio_app = create_socketio_app(self._api_app)
#

Allow commands to come via websockets, this is explained below

        self._socketio_app.on("action", lambda s, m: self.__handle_websocket_actions(s, m))
#

Manage socket.io disconnections so we can handle a domain event that sets the user as offline

        self._socketio_app.on("disconnect",
                              lambda s: self.__handle_websocket_actions(s, self.__create_disconnect_action(s)))

        injector.binder.bind(socketio.Server, self._socketio_app)
#

Make sure that all events that are sent to other domains are published using our system of choice

        [self._domain_app.event_bus.subscribe(event,
                                              build_asynchronous_class_effect_handler(EventToSocketIOBridge,
                                                                                      None,
                                                                                      injector))
         for event in events_to_publish]
#

Errors are sent to the event bus to an effect handler can act on an error

    def __handle_internal_error(self, error: Exception, http_status_code: int = 500):
#
        response_body = """{"application_error": {"status_code": %d ,"message": "%s"}}""" % (
            http_status_code, error.__str__())
        self._domain_app.event_bus.handle(ApplicationError(error=str(error), stack_trace=error.__traceback__))
        return response_body, http_status_code
#

Simulate a session disconnected action coming from the frontend in case of a websocket disconnection

    def __create_disconnect_action(self, s: str):
#
        return {
            "type": "server/SESSION_DISCONNECTED",
            "data": {
                "sessionId": s
            }}
#

In this case we are receiving AssociateUserToSession command using socket.io websocket so we can obtain session id and register that session Id with a user. Commands / events could also come using websockets so here Commands or Events are created and handled by the corresponding bus. Made as an example.

    def __handle_websocket_actions(self, sid: str, message: dict):
#
        logger.debug(f"action received: {sid} {message}")
        message["type"] = snake_to_upper_camel(message["type"].split("/")[1])
        if message["type"] == "AssociateUserToSession":
            message = self.__add_session_id(message, sid)
        bus = None
        effect_type = None
        if message["type"] in self._available_commands:
            bus = self._domain_app.command_bus
            effect_type = self._available_commands[message["type"]]
        elif message["type"] in self._available_events:
            bus = self._domain_app.event_bus
            effect_type = self._available_events[message["type"]]
        effect = from_untyped_dict(effect_type, from_javascript(message["data"]))
        bus.handle(effect)
#
    def __add_session_id(self, message: dict, session_id: str):
        return {**message, "data": {**message["data"], "sessionId": session_id}}
#

Start the app. Notify all interested parties

    def start(self):
#
        self._domain_app.start()
        self._api_app.run(threaded=True, host=self._config.host, port=self._config.port, debug=False)
#

Stop the app. Notify all interested parties

    def stop(self):
#
        self._domain_app.stop()