tabbyAPI-ollama/endpoints/OAI/utils/completion.py
Brian Dashore 6e48bb420a
Model: Fix inline loading and draft key (#225)
* Model: Fix inline loading and draft key

There was a lack of foresight between the new config.yml and how
it was structured. The "draft" key became "draft_model" without updating
both the API request and inline loading keys.

For the API requests, still support "draft" as legacy, but the "draft_model"
key is preferred.

Signed-off-by: kingbri <bdashore3@proton.me>

* OAI: Add draft model dir to inline load

Was not pushed before and caused errors of the kwargs being None.

Signed-off-by: kingbri <bdashore3@proton.me>

* Model: Fix draft args application

Draft model args weren't applying since there was a reset due to how
the old override behavior worked.

Signed-off-by: kingbri <bdashore3@proton.me>

* OAI: Change embedding model load params

Use embedding_model_name to be inline with the config.

Signed-off-by: kingbri <bdashore3@proton.me>

* API: Fix parameter for draft model load

Alias name to draft_model_name.

Signed-off-by: kingbri <bdashore3@proton.me>

* API: Fix parameter for template switch

Add prompt_template_name to be more descriptive.

Signed-off-by: kingbri <bdashore3@proton.me>

* API: Fix parameter for model load

Alias name to model_name for config parity.

Signed-off-by: kingbri <bdashore3@proton.me>

* API: Add alias documentation

Signed-off-by: kingbri <bdashore3@proton.me>

---------

Signed-off-by: kingbri <bdashore3@proton.me>
2024-10-24 23:35:05 -04:00

269 lines
8.2 KiB
Python

"""
Completion utilities for OAI server.
Also serves as a common module for completions and chat completions.
"""
import asyncio
import pathlib
from asyncio import CancelledError
from copy import deepcopy
from fastapi import HTTPException, Request
from typing import List, Union
from loguru import logger
from common import model
from common.auth import get_key_permission
from common.networking import (
get_generator_error,
handle_request_disconnect,
handle_request_error,
request_disconnect_loop,
)
from common.tabby_config import config
from common.utils import unwrap
from endpoints.OAI.types.completion import (
CompletionRequest,
CompletionResponse,
CompletionRespChoice,
CompletionLogProbs,
)
from endpoints.OAI.types.common import UsageStats
def _create_response(
request_id: str, generations: Union[dict, List[dict]], model_name: str = ""
):
"""Create a completion response from the provided choices."""
# Convert the single choice object into a list
if not isinstance(generations, list):
generations = [generations]
choices: List[CompletionRespChoice] = []
for index, generation in enumerate(generations):
logprob_response = None
token_probs = unwrap(generation.get("token_probs"), {})
if token_probs:
logprobs = unwrap(generation.get("logprobs"), [])
offset = unwrap(generation.get("offset"), [])
logprob_response = CompletionLogProbs(
text_offset=offset if isinstance(offset, list) else [offset],
token_logprobs=token_probs.values(),
tokens=token_probs.keys(),
top_logprobs=logprobs if isinstance(logprobs, list) else [logprobs],
)
# The index can be located in the generation itself
choice = CompletionRespChoice(
index=unwrap(generation.get("index"), index),
finish_reason=generation.get("finish_reason"),
text=unwrap(generation.get("text"), ""),
logprobs=logprob_response,
)
choices.append(choice)
prompt_tokens = unwrap(generations[-1].get("prompt_tokens"), 0)
completion_tokens = unwrap(generations[-1].get("generated_tokens"), 0)
response = CompletionResponse(
id=f"cmpl-{request_id}",
choices=choices,
model=model_name,
usage=UsageStats(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
return response
async def _stream_collector(
task_idx: int,
gen_queue: asyncio.Queue,
prompt: str,
request_id: str,
abort_event: asyncio.Event,
**kwargs,
):
"""Collects a stream and places results in a common queue"""
try:
new_generation = model.container.generate_gen(
prompt, request_id, abort_event, **kwargs
)
async for generation in new_generation:
generation["index"] = task_idx
await gen_queue.put(generation)
if "finish_reason" in generation:
break
except Exception as e:
await gen_queue.put(e)
async def load_inline_model(model_name: str, request: Request):
"""Load a model from the data.model parameter"""
# Return if the model container already exists and the model is fully loaded
if (
model.container
and model.container.model_dir.name == model_name
and model.container.model_loaded
):
return
# Inline model loading isn't enabled or the user isn't an admin
if not get_key_permission(request) == "admin":
error_message = handle_request_error(
f"Unable to switch model to {model_name} because "
+ "an admin key isn't provided",
exc_info=False,
).error.message
raise HTTPException(401, error_message)
if not config.model.inline_model_loading:
logger.warning(
f"Unable to switch model to {model_name} because "
'"inline_model_loading" is not True in config.yml.'
)
return
model_path = pathlib.Path(config.model.model_dir)
model_path = model_path / model_name
# Model path doesn't exist
if not model_path.exists():
logger.warning(
f"Could not find model path {str(model_path)}. Skipping inline model load."
)
return
# Load the model and also add draft dir
await model.load_model(
model_path,
draft_model=config.draft_model.model_dump(include={"draft_model_dir"}),
)
async def stream_generate_completion(
data: CompletionRequest, request: Request, model_path: pathlib.Path
):
"""Streaming generation for completions."""
abort_event = asyncio.Event()
gen_queue = asyncio.Queue()
gen_tasks: List[asyncio.Task] = []
disconnect_task = asyncio.create_task(request_disconnect_loop(request))
try:
logger.info(f"Received streaming completion request {request.state.id}")
gen_params = data.to_gen_params()
for n in range(0, data.n):
if n > 0:
task_gen_params = deepcopy(gen_params)
else:
task_gen_params = gen_params
gen_task = asyncio.create_task(
_stream_collector(
n,
gen_queue,
data.prompt,
request.state.id,
abort_event,
**task_gen_params,
)
)
gen_tasks.append(gen_task)
# Consumer loop
while True:
if disconnect_task.done():
abort_event.set()
handle_request_disconnect(
f"Completion generation {request.state.id} cancelled by user."
)
generation = await gen_queue.get()
# Stream collector will push an exception to the queue if it fails
if isinstance(generation, Exception):
raise generation
response = _create_response(request.state.id, generation, model_path.name)
yield response.model_dump_json()
# Check if all tasks are completed
if all(task.done() for task in gen_tasks) and gen_queue.empty():
yield "[DONE]"
logger.info(f"Finished streaming completion request {request.state.id}")
break
except CancelledError:
# Get out if the request gets disconnected
if not disconnect_task.done():
abort_event.set()
handle_request_disconnect(
f"Completion generation {request.state.id} cancelled by user."
)
except Exception:
yield get_generator_error(
f"Completion {request.state.id} aborted. Please check the server console."
)
async def generate_completion(
data: CompletionRequest, request: Request, model_path: pathlib.Path
):
"""Non-streaming generate for completions"""
gen_tasks: List[asyncio.Task] = []
gen_params = data.to_gen_params()
try:
logger.info(f"Recieved completion request {request.state.id}")
for n in range(0, data.n):
# Deepcopy gen params above the first index
# to ensure nested structures aren't shared
if n > 0:
task_gen_params = deepcopy(gen_params)
else:
task_gen_params = gen_params
gen_tasks.append(
asyncio.create_task(
model.container.generate(
data.prompt, request.state.id, **task_gen_params
)
)
)
generations = await asyncio.gather(*gen_tasks)
response = _create_response(request.state.id, generations, model_path.name)
logger.info(f"Finished completion request {request.state.id}")
return response
except Exception as exc:
error_message = handle_request_error(
f"Completion {request.state.id} aborted. Maybe the model was unloaded? "
"Please check the server console."
).error.message
# Server error if there's a generation exception
raise HTTPException(503, error_message) from exc