Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions model-engine/model_engine_server/common/dtos/llms.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,30 @@ class CreateBatchCompletionsRequest(BaseModel):
"""


class CreateBatchCompletionsEngineRequest(CreateBatchCompletionsRequest):
"""
Internal model for representing request to the llm engine. This contains additional fields that we want
hidden from the DTO exposed to the client.
"""

max_gpu_memory_utilization: Optional[float] = Field(default=0.9, le=1.0)
"""
Maximum GPU memory utilization for the batch inference. Default to 90%.
"""

@staticmethod
def from_api(request: CreateBatchCompletionsRequest) -> "CreateBatchCompletionsEngineRequest":
return CreateBatchCompletionsEngineRequest(
input_data_path=request.input_data_path,
output_data_path=request.output_data_path,
content=request.content,
model_config=request.model_config,
data_parallelism=request.data_parallelism,
max_runtime_sec=request.max_runtime_sec,
tool_config=request.tool_config,
)


class CreateBatchCompletionsResponse(BaseModel):
job_id: str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import math
import os
import re
from dataclasses import asdict
from dataclasses import asdict, dataclass
from typing import Any, AsyncIterable, Dict, List, Optional, Union

from model_engine_server.common.config import hmi_config
Expand All @@ -21,6 +21,7 @@
CompletionStreamV1Response,
CompletionSyncV1Request,
CompletionSyncV1Response,
CreateBatchCompletionsEngineRequest,
CreateBatchCompletionsRequest,
CreateBatchCompletionsResponse,
CreateLLMModelEndpointV1Request,
Expand Down Expand Up @@ -2200,6 +2201,27 @@ async def execute(self, user: User, request: ModelDownloadRequest) -> ModelDownl
return ModelDownloadResponse(urls=urls)


@dataclass
class VLLMEngineArgs:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I know this is by no means the main offender, but implementation specifics like vLLM aren't supposed to go into the use case layer. Granted, that'd require another layer, which I suspect @yunfeng-scale would find perfunctory 😁

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I could just call it LLMEngineArgs. It seems right now we only support batch inference w/ vLLM, so we could try to do a proper abstraction when we decide we need to support it for a different engine?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think this is ok for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh 😅 you had a good point, the current code structure does not completely fit into clean architecture. in that sense we might want to move all these framework-specific code to another layer

gpu_memory_utilization: Optional[float] = None


def infer_addition_engine_args_from_model_name(model_name: str) -> VLLMEngineArgs:
numbers = re.findall(r"\d+", model_name)
if len(numbers) == 0:
raise ObjectHasInvalidValueException(
f"Model {model_name} is not supported for batch completions."
)

b_params = int(numbers[-1])
if b_params >= 70:
gpu_memory_utilization = 0.95
else:
gpu_memory_utilization = 0.9

return VLLMEngineArgs(gpu_memory_utilization=gpu_memory_utilization)


def infer_hardware_from_model_name(model_name: str) -> CreateDockerImageBatchJobResourceRequests:
if "mixtral-8x7b" in model_name:
cpus = "20"
Expand Down Expand Up @@ -2324,14 +2346,25 @@ async def execute(
assert hardware.gpus is not None
if request.model_config.num_shards:
hardware.gpus = max(hardware.gpus, request.model_config.num_shards)
request.model_config.num_shards = hardware.gpus

if request.tool_config and request.tool_config.name != "code_evaluator":
engine_request = CreateBatchCompletionsEngineRequest.from_api(request)
engine_request.model_config.num_shards = hardware.gpus

if engine_request.tool_config and engine_request.tool_config.name != "code_evaluator":
raise ObjectHasInvalidValueException(
"Only code_evaluator tool is supported for batch completions."
)

batch_bundle = await self.create_batch_job_bundle(user, request, hardware)
additional_engine_args = infer_addition_engine_args_from_model_name(
engine_request.model_config.model
)

if additional_engine_args.gpu_memory_utilization is not None:
engine_request.max_gpu_memory_utilization = (
additional_engine_args.gpu_memory_utilization
)

batch_bundle = await self.create_batch_job_bundle(user, engine_request, hardware)

validate_resource_requests(
bundle=batch_bundle,
Expand All @@ -2342,21 +2375,21 @@ async def execute(
gpu_type=hardware.gpu_type,
)

if request.max_runtime_sec is None or request.max_runtime_sec < 1:
if engine_request.max_runtime_sec is None or engine_request.max_runtime_sec < 1:
raise ObjectHasInvalidValueException("max_runtime_sec must be a positive integer.")

job_id = await self.docker_image_batch_job_gateway.create_docker_image_batch_job(
created_by=user.user_id,
owner=user.team_id,
job_config=request.dict(),
job_config=engine_request.dict(),
env=batch_bundle.env,
command=batch_bundle.command,
repo=batch_bundle.image_repository,
tag=batch_bundle.image_tag,
resource_requests=hardware,
labels=request.model_config.labels,
labels=engine_request.model_config.labels,
mount_location=batch_bundle.mount_location,
override_job_max_runtime_s=request.max_runtime_sec,
num_workers=request.data_parallelism,
override_job_max_runtime_s=engine_request.max_runtime_sec,
num_workers=engine_request.data_parallelism,
)
return CreateBatchCompletionsResponse(job_id=job_id)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from func_timeout import FunctionTimedOut, func_set_timeout
from model_engine_server.common.dtos.llms import (
CompletionOutput,
CreateBatchCompletionsRequest,
CreateBatchCompletionsEngineRequest,
CreateBatchCompletionsRequestContent,
TokenOutput,
ToolConfig,
Expand Down Expand Up @@ -145,7 +145,7 @@ def random_uuid() -> str:
return str(uuid.uuid4().hex)


def get_vllm_engine(model, request):
def get_vllm_engine(model: str, request: CreateBatchCompletionsEngineRequest):
from vllm import AsyncEngineArgs, AsyncLLMEngine

engine_args = AsyncEngineArgs(
Expand All @@ -154,7 +154,7 @@ def get_vllm_engine(model, request):
tensor_parallel_size=request.model_config.num_shards,
seed=request.model_config.seed or 0,
disable_log_requests=True,
gpu_memory_utilization=0.9,
gpu_memory_utilization=request.max_gpu_memory_utilization or 0.9,
)

llm = AsyncLLMEngine.from_engine_args(engine_args)
Expand Down Expand Up @@ -313,7 +313,7 @@ def tool_func(text: str, past_context: Optional[str]):
async def batch_inference():
job_index = int(os.getenv("JOB_COMPLETION_INDEX", 0))

request = CreateBatchCompletionsRequest.parse_file(CONFIG_FILE)
request = CreateBatchCompletionsEngineRequest.parse_file(CONFIG_FILE)

if request.model_config.checkpoint_path is not None:
download_model(request.model_config.checkpoint_path, MODEL_WEIGHTS_FOLDER)
Expand Down
17 changes: 12 additions & 5 deletions model-engine/tests/unit/inference/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from model_engine_server.common.dtos.llms import (
CompletionOutput,
CreateBatchCompletionsEngineRequest,
CreateBatchCompletionsModelConfig,
CreateBatchCompletionsRequest,
CreateBatchCompletionsRequestContent,
Expand All @@ -12,14 +13,20 @@


@pytest.fixture
def create_batch_completions_request():
return CreateBatchCompletionsRequest(
def create_batch_completions_engine_request() -> CreateBatchCompletionsEngineRequest:
return CreateBatchCompletionsEngineRequest(
input_data_path="input_data_path",
output_data_path="output_data_path",
model_config=CreateBatchCompletionsModelConfig(
checkpoint_path="checkpoint_path", model="model", num_shards=4, seed=123, labels={}
model="model",
checkpoint_path="checkpoint_path",
labels={},
seed=123,
num_shards=4,
),
data_parallelism=1,
input_data_path="input_data_path",
output_data_path="output_data_path",
max_runtime_sec=86400,
max_gpu_memory_utilization=0.95,
)


Expand Down
Loading