Skip to content

flask_manager

drunc.utils.flask_manager

Flask application manager utilities for DRUNC.

Classes

CannotStartFlaskManager(message='An error occurred in Drunc.', grpc_error_code=None, details=None, **detail_kwargs)

Bases: DruncCommandException

Exception raised when the Flask manager cannot start.

Source code in drunc/exceptions.py
def __init__(
    self,
    message: str = "An error occurred in Drunc.",
    grpc_error_code=None,
    details=None,  # optional rich error detail
    **detail_kwargs,
):
    super().__init__(message)

    if message is not None:
        self.message = message

    if grpc_error_code is None:
        grpc_error_code = getattr(self, "grpc_error_code", code_pb2.INTERNAL)

    if details is not None:
        self.details = details

    self.detail_kwargs = detail_kwargs

FlaskManager(name, app, port, workers=1, host='0.0.0.0')

Bases: Thread

Manager for Flask applications running in a separate thread.

It allows to have a Flask server under a thread, start and stop it. Note that it creates another endpoint accessible at the route /readystatus. This is used to poll if the service is up, however the user can provide it.

To use this code, one can use the following example:

from flask import Flask
from flask_restful import Api
app = Flask('some-name')
api = Api(app)
api.add_resource(
    AnEndpointResourceClass, "/endpoint",
)

from flask_manager import FlaskManager
manager = FlaskManager(
    port = port,
    app = app,
    name = "some-name"
)

manager.start()
while not manager.is_ready():
    from time import sleep
    sleep(0.1)

Then, later on, to stop it:

manager.stop()

Initialize a FlaskManager.

Parameters:

Name Type Description Default
name str

The name of the Flask manager.

required
app Flask

The Flask application to manage.

required
port int

The port to run the Flask server on.

required
workers int

The number of Gunicorn workers. Defaults to 1.

1
host str

The host address to bind to. Defaults to "0.0.0.0".

'0.0.0.0'
Source code in drunc/utils/flask_manager.py
def __init__(
    self,
    name: str,
    app: Flask,
    port: int,
    workers: int = 1,
    host: str = "0.0.0.0",
) -> None:
    """Initialize a FlaskManager.

    Args:
        name: The name of the Flask manager.
        app: The Flask application to manage.
        port: The port to run the Flask server on.
        workers: The number of Gunicorn workers. Defaults to 1.
        host: The host address to bind to. Defaults to "0.0.0.0".
    """
    super(FlaskManager, self).__init__(daemon=True)
    self.log = get_logger(f"{name}-flaskmanager", stream_handlers=True)
    self.name = name
    self.app = app
    self.prod_app: GunicornStandaloneApplication | None = None
    self.flask: Process | None = None

    self.host = host
    self.port = port

    self.workers = workers
    self.gunicorn_pid: int | None = None
    self.ready = False
    self.joined = False
    self.ready_lock = threading.Lock()
Methods:
__del__()

Cleanup when the FlaskManager is destroyed.

Source code in drunc/utils/flask_manager.py
def __del__(self) -> None:
    """Cleanup when the FlaskManager is destroyed."""
    self.stop()
is_ready()

Check if the Flask manager is ready to serve requests.

Returns:

Name Type Description
bool bool

True if ready, False otherwise.

Source code in drunc/utils/flask_manager.py
def is_ready(self) -> bool:
    """Check if the Flask manager is ready to serve requests.

    Returns:
        bool: True if ready, False otherwise.
    """
    with self.ready_lock:
        return self.ready
is_terminated()

Check if the Flask manager has been terminated.

Returns:

Name Type Description
bool bool

True if terminated, False otherwise.

Source code in drunc/utils/flask_manager.py
def is_terminated(self) -> bool:
    """Check if the Flask manager has been terminated.

    Returns:
        bool: True if terminated, False otherwise.
    """
    with self.ready_lock:
        return self.joined
restart_renew()

Restart and renew the Flask manager.

Stops the current instance and creates a new one with the same configuration.

Returns:

Name Type Description
FlaskManager FlaskManager

A new FlaskManager instance with the same settings.

Source code in drunc/utils/flask_manager.py
def restart_renew(self) -> "FlaskManager":
    """Restart and renew the Flask manager.

    Stops the current instance and creates a new one with the same configuration.

    Returns:
        FlaskManager: A new FlaskManager instance with the same settings.
    """
    # well, we cannot really do that.
    # we have to hack it a bit:
    # unfortunately, this means you need to do:
    # manager = manager.restart_renew()

    fm = FlaskManager(
        name=self.name,
        app=self.app,
        port=self.port,
        workers=self.workers,
        host=self.host,
    )
    fm.start()
    while not fm.is_ready():
        time.sleep(0.1)
    return fm
run()

Run the Flask server in the thread.

This method is called when the thread is started.

Source code in drunc/utils/flask_manager.py
def run(self) -> None:
    """Run the Flask server in the thread.

    This method is called when the thread is started.
    """
    self._create_and_join_flask()
stop()

Stop the Flask manager and terminate the Gunicorn process.

Sends SIGTERM to the Gunicorn process and joins the Flask process thread.

Source code in drunc/utils/flask_manager.py
def stop(self) -> None:
    """Stop the Flask manager and terminate the Gunicorn process.

    Sends SIGTERM to the Gunicorn process and joins the Flask process thread.
    """
    if self.gunicorn_pid:
        gunicorn_proc = psutil.Process(self.gunicorn_pid)
        # https://github.com/benoitc/gunicorn/blob/ab9c8301cb9ae573ba597154ddeea16f0326fc15/docs/source/signals.rst#master-process
        # TOTAL DESTRUCTION
        gunicorn_proc.send_signal(signal.SIGTERM)
        if self.flask is not None:
            self.flask.terminate()

    self.join()

GunicornStandaloneApplication(app, options=None)

Bases: BaseApplication

Standalone Gunicorn application wrapper.

Initialize a GunicornStandaloneApplication.

Parameters:

Name Type Description Default
app Flask

The Flask application to run.

required
options dict[str, object] | None

Configuration options for Gunicorn. Defaults to None.

None
Source code in drunc/utils/flask_manager.py
def __init__(
    self,
    app: Flask,
    options: dict[str, object] | None = None,
) -> None:
    """Initialize a GunicornStandaloneApplication.

    Args:
        app: The Flask application to run.
        options: Configuration options for Gunicorn. Defaults to None.
    """
    self.options = options or {}
    self.application = app
    super().__init__()
Methods:
load()

Load the Flask application.

Returns:

Name Type Description
Flask Flask

The Flask application.

Source code in drunc/utils/flask_manager.py
def load(self) -> Flask:
    """Load the Flask application.

    Returns:
        Flask: The Flask application.
    """
    return self.application
load_config()

Load Gunicorn configuration from options.

Source code in drunc/utils/flask_manager.py
def load_config(self) -> None:
    """Load Gunicorn configuration from options."""
    config = {
        key: value
        for key, value in self.options.items()
        if key in self.cfg.settings and value is not None
    }
    for key, value in config.items():
        self.cfg.set(key.lower(), value)

Functions:

main()

Main entry point for demonstrating the FlaskManager.

Creates a simple Flask application with a dummy endpoint and starts it.

Source code in drunc/utils/flask_manager.py
def main() -> None:
    """Main entry point for demonstrating the FlaskManager.

    Creates a simple Flask application with a dummy endpoint and starts it.
    """

    class DummyEndpoint(_Resource):
        def post(self) -> None:
            print(request)

        def get(self) -> object:
            return make_response(jsonify({"weeeee": "wooo"}))

    app = Flask("test-app")
    api = Api(app)
    api.add_resource(DummyEndpoint, "/dummy", methods=["GET", "POST"])

    for _ in range(10):
        try:
            manager = FlaskManager(
                port=get_new_port(), app=app, name="test_name", host="127.0.0.1"
            )
        except:
            continue
        else:
            manager.start()
            while not manager.is_ready():
                time.sleep(0.1)
            assert not manager.is_terminated()
            assert manager.is_ready()

            requests.get(f"http://127.0.0.1:{manager.port}/dummy")
            print("succesfully got endpoint /dummy")
            manager.stop()
            assert manager.is_terminated()
            assert not manager.is_ready()

            manager = manager.restart_renew()
            assert not manager.is_terminated()
            assert manager.is_ready()
            requests.get(f"http://127.0.0.1:{manager.port}/dummy")
            print("succesfully got endpoint /dummy")
            manager.stop()
            assert manager.is_terminated()
            assert not manager.is_ready()
            break