Skip to content

pixano_inference.routers.utils

Utils for the routers.

delete_task(task_id) async

Delete a task.

Source code in pixano_inference/routers/utils.py
async def delete_task(task_id: str) -> None:
    """Delete a task."""
    celery_app.control.revoke(task_id, terminate=True)
    return

execute_task_request(request, task, settings) async

Execute a task request.

Parameters:

Name Type Description Default
request BaseRequest

Request to execute.

required
task Task

Task to execute

required
settings Settings

Settings of the app.

required

Returns:

Type Description
CeleryTask

Response of the request

Source code in pixano_inference/routers/utils.py
async def execute_task_request(request: BaseRequest, task: Task, settings: Settings) -> CeleryTask:
    """Execute a task request.

    Args:
        request: Request to execute.
        task: Task to execute
        settings: Settings of the app.

    Returns:
        Response of the request
    """
    model_name = request.model
    if model_name not in settings.models:
        raise HTTPException(404, detail=f"Model {model_name} is not registered.")
    model_task = settings.models_to_task[model_name]
    if task.value != model_task:
        raise HTTPException(400, detail=f"Model {model_name} does not support the {task.value} task.")

    queue = model_queue_name(model_name)
    celery_task: AsyncResult = predict.apply_async((jsonable_encoder(request),), queue=queue)
    return CeleryTask(id=celery_task.id, status=states.PENDING)

get_task_result(task_id, response_type) async

Get the result of a task.

Parameters:

Name Type Description Default
task_id str

ID of the task to retrieve.

required
response_type type[BaseResponse]

Type of response to return.

required

Returns:

Type Description
CeleryTask | BaseResponse

Response for the task generation.

Source code in pixano_inference/routers/utils.py
async def get_task_result(task_id: str, response_type: type[BaseResponse]) -> CeleryTask | BaseResponse:
    """Get the result of a task.

    Args:
        task_id: ID of the task to retrieve.
        response_type: Type of response to return.

    Returns:
        Response for the task generation.
    """
    task_result = celery_app.AsyncResult(task_id)
    status, result = task_result.status, task_result.result
    if status in [states.PENDING, states.REVOKED, states.STARTED, states.FAILURE, states.REVOKED]:
        return CeleryTask(id=task_id, status=status)
    elif status != states.SUCCESS:
        raise HTTPException(status_code=500, detail=f"Unknown task status {status}")
    result["id"] = task_id
    result["status"] = status
    response = response_type.model_construct(**result)
    task_result.forget()
    return response