Skip to content

client

drunc.connectivity_service.client

Classes

ConnectivityServiceClient(session, address)

Source code in drunc/connectivity_service/client.py
def __init__(self, session: str, address: str):
    self.session = session
    self.log = get_logger("utils.ConnectivityServiceClient")

    if address.startswith("http://") or address.startswith("https://"):
        self.address = address
    else:
        # assume the simplest case here
        self.address = f"http://{address}"

    self.log.debug(
        f"Connectivity service address: {self.address}, session: {self.session}"
    )
Functions
is_ready(timeout=10)

Check if the service is ready by polling the connectivity service with an exponential backoff.

Parameters:

Name Type Description Default
timeout int

Maximum time in seconds to wait for the service to become ready

10

Returns:

Type Description

True if service becomes ready within timeout, False otherwise

Source code in drunc/connectivity_service/client.py
def is_ready(self, timeout: int = 10):
    """
    Check if the service is ready by polling the connectivity service
    with an exponential backoff.

    Args:
        timeout: Maximum time in seconds to wait for the service to become ready

    Returns:
        True if service becomes ready within timeout, False otherwise
    """
    start = time.time()
    attempt = 0
    delay = 0.1  # Initial delay in seconds
    max_delay = 2.0  # Maximum delay between attempts

    while time.time() - start < timeout:
        attempt += 1
        elapsed = time.time() - start

        self.log.debug(f"Health check attempt {attempt} at {elapsed:.2f}s elapsed")
        self.log.debug(f"Polling address: {self.address}")
        try:
            r = get(self.address)
        except Exception as e:
            self.log.debug(f"Polling failed with exception: {e}")
            r = None
        # Request failed - service is NOT ready.
        if r is None or not r.ok:
            self.log.debug(
                f"Connectivity service not ready, retrying in {delay:.2f}s"
            )
            time.sleep(delay)
            # Increase delay for next time.
            delay = min(delay * 2, max_delay)
        # Request succeeded - service is ready.
        else:
            self.log.debug(
                f"Connectivity service ready after {attempt} attempts ({elapsed:.2f}s)"
            )
            return True

    self.log.debug(
        f"Connectivity service not ready after {attempt} attempts. Timeout of {timeout}s reached."
    )
    return False
retract_partition(fail_quickly=False, fail_quietly=False)

Retract the whole partition (session) from the connectivity service.

Parameters:

Name Type Description Default
fail_quickly bool

If True, the function will return immediately on failure without raising exceptions. Default is False.

False
fail_quietly bool

If True, the function will suppress all exceptions and log errors as warnings. Default is False.

False
Source code in drunc/connectivity_service/client.py
def retract_partition(
    self, fail_quickly: bool = False, fail_quietly: bool = False
) -> None:
    """
    Retract the whole partition (session) from the connectivity service.

    Args:
        fail_quickly (bool): If True, the function will return immediately on failure without
                             raising exceptions. Default is False.
        fail_quietly (bool): If True, the function will suppress all exceptions and log
                             errors as warnings. Default is False.
    """
    data = {"partition": self.session}
    for i in range(50):
        try:
            self.log.debug(
                f"Retracting session {self.session} on the connectivity service, attempt {i + 1}: {data=}"
            )

            r = http_post(
                self.address + "/retract-partition",
                data=data,
                headers={"Content-Type": "application/json"},
                as_json=True,
                timeout=0.5,
                ignore_errors=True,
            )

            if r.status_code == 404:
                if not fail_quietly:
                    self.log.warning(
                        f"Session {self.session} not found on the connectivity service"
                    )
                break

            r.raise_for_status()
            break

        except (HTTPError, ConnectionError) as e:
            self.log.debug(e)
            if not fail_quickly:
                time.sleep(0.5)

        except Exception as e:
            if fail_quickly:
                if not fail_quietly:
                    self.log.info(
                        f"Could not retract session {self.session} on the connectivity service at the address {self.address}"
                    )
                    self.log.debug(e)
            else:
                raise e

        finally:
            if fail_quickly:
                return

Functions