Skip to content

pixano_inference.ray.deployment

Ray actor wrapper for InferenceModel subclasses.

create_model_deployment(model_class, config)

Wrap an InferenceModel subclass as a Ray remote actor.

Creates a Ray actor class with: - predict(input_data) method forwarding to the model - get_metadata() method - get_stats() method (request count, avg time) - unload() method

The actor's __init__ instantiates the model class and calls load_model(). Ray actor options (GPU/CPU/memory) come from config.resources.

Parameters:

Name Type Description Default
model_class type[InferenceModel]

An InferenceModel subclass to deploy.

required
config ModelDeploymentConfig

Deployment configuration.

required

Returns:

Type Description
Any

A Ray remote actor handle (already created and running).

Source code in pixano_inference/ray/deployment.py
def create_model_deployment(
    model_class: type[InferenceModel],
    config: ModelDeploymentConfig,
) -> Any:
    """Wrap an InferenceModel subclass as a Ray remote actor.

    Creates a Ray actor class with:
    - ``predict(input_data)`` method forwarding to the model
    - ``get_metadata()`` method
    - ``get_stats()`` method (request count, avg time)
    - ``unload()`` method

    The actor's ``__init__`` instantiates the model class and calls
    ``load_model()``. Ray actor options (GPU/CPU/memory) come from
    ``config.resources``.

    Args:
        model_class: An InferenceModel subclass to deploy.
        config: Deployment configuration.

    Returns:
        A Ray remote actor handle (already created and running).

    """
    ray_actor_options: dict[str, Any] = {
        "num_gpus": config.resources.num_gpus,
        "num_cpus": config.resources.num_cpus,
    }
    if config.resources.memory_mb is not None:
        ray_actor_options["memory"] = config.resources.memory_mb * 1024 * 1024

    # Capture in closure
    _model_class = model_class
    _config = config

    @ray.remote(**ray_actor_options)
    class ModelActor:
        """Ray actor wrapping an InferenceModel."""

        def __init__(self) -> None:
            self._model = _model_class(_config)
            self._model.load_model()
            self._request_count = 0
            self._total_processing_time = 0.0
            logger.info(f"Actor '{_config.name}' initialized with {_model_class.__name__}")

        def predict(self, input_data: Any) -> Any:
            """Run inference.

            Args:
                input_data: Task-specific Input object.

            Returns:
                Task-specific Output object.
            """
            start_time = time.time()
            self._request_count += 1

            result = self._model.predict(input_data)

            processing_time = time.time() - start_time
            self._total_processing_time += processing_time
            return result

        def get_metadata(self) -> dict[str, Any]:
            """Get model metadata.

            Returns:
                Model metadata dictionary.
            """
            return self._model.metadata

        def get_stats(self) -> dict[str, Any]:
            """Get deployment statistics.

            Returns:
                Statistics dictionary.
            """
            return {
                "model_name": _config.name,
                "capability": _config.capability,
                "model_class": _config.model_class,
                "request_count": self._request_count,
                "total_processing_time": self._total_processing_time,
                "avg_processing_time": (
                    self._total_processing_time / self._request_count if self._request_count > 0 else 0
                ),
            }

        def unload(self) -> None:
            """Unload the model and free resources."""
            self._model.unload()
            logger.info(f"Actor '{_config.name}' unloaded")

        def ready(self) -> bool:
            """Check if the actor is ready (model loaded).

            Returns:
                True when ready.
            """
            return True

    # Create the actor and wait for it to be ready (model loaded)
    handle = ModelActor.remote()  # type: ignore[attr-defined]
    ray.get(handle.ready.remote())
    return handle