Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ spec:
- --loglevel=INFO
- --concurrency=2
{{- if .Values.serviceIdentifier }}
- --queues=model-engine-{{ .Values.serviceIdentifier }}.service-builder
- --queues=model-engine-{{ .Values.serviceIdentifier }}-service-builder
{{- else }}
- --queues=model-engine.service-builder
- --queues=model-engine-service-builder
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
Expand Down
4 changes: 2 additions & 2 deletions model-engine/model_engine_server/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ def get_sync_endpoint_elb_url(deployment_name: str) -> str:

def get_service_builder_queue(service_identifier=None):
return (
f"{SERVICE_BUILDER_QUEUE_PREFIX}-{service_identifier}.{SERVICE_BUILDER_QUEUE_SUFFIX}"
f"{SERVICE_BUILDER_QUEUE_PREFIX}-{service_identifier}-{SERVICE_BUILDER_QUEUE_SUFFIX}"
if service_identifier
else f"{SERVICE_BUILDER_QUEUE_PREFIX}.{SERVICE_BUILDER_QUEUE_SUFFIX}"
else f"{SERVICE_BUILDER_QUEUE_PREFIX}-{SERVICE_BUILDER_QUEUE_SUFFIX}"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from model_engine_server.core.config import infra_config
from model_engine_server.db.base import SessionAsyncNullPool
from model_engine_server.domain.entities import BatchJobSerializationFormat
from model_engine_server.domain.gateways import TaskQueueGateway
from model_engine_server.infra.gateways import (
ABSFilesystemGateway,
CeleryTaskQueueGateway,
Expand Down Expand Up @@ -78,17 +79,22 @@ async def run_batch_job(
)

resource_gateway = LiveEndpointResourceGateway(queue_delegate=queue_delegate)
model_endpoint_cache_repo = RedisModelEndpointCacheRepository(
redis_client=redis,
)
inference_task_queue_gateway = (
servicebus_task_queue_gateway
if infra_config().cloud_provider == "azure"
else sqs_task_queue_gateway
)

inference_task_queue_gateway: TaskQueueGateway
infra_task_queue_gateway: TaskQueueGateway
if infra_config().cloud_provider == "azure":
inference_task_queue_gateway = servicebus_task_queue_gateway
infra_task_queue_gateway = servicebus_task_queue_gateway
else:
inference_task_queue_gateway = sqs_task_queue_gateway
infra_task_queue_gateway = sqs_task_queue_gateway

model_endpoint_infra_gateway = LiveModelEndpointInfraGateway(
resource_gateway=resource_gateway,
task_queue_gateway=inference_task_queue_gateway,
task_queue_gateway=infra_task_queue_gateway,
)
model_endpoint_cache_repo = RedisModelEndpointCacheRepository(
redis_client=redis,
)
async_model_endpoint_inference_gateway = LiveAsyncModelEndpointInferenceGateway(
task_queue_gateway=inference_task_queue_gateway
Expand Down
13 changes: 10 additions & 3 deletions model-engine/model_engine_server/service_builder/celery.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from model_engine_server.common.dtos.model_endpoints import BrokerType
from model_engine_server.common.env_vars import CIRCLECI
from model_engine_server.core.celery import celery_app
from model_engine_server.core.config import infra_config

service_builder_broker_type: str
if CIRCLECI:
service_builder_broker_type = str(BrokerType.REDIS.value)
elif infra_config().cloud_provider == "azure":
service_builder_broker_type = str(BrokerType.SERVICEBUS.value)
else:
service_builder_broker_type = str(BrokerType.SQS.value)

service_builder_service = celery_app(
name="model_engine_server.service_builder",
modules=[
"model_engine_server.service_builder.tasks_v1",
],
s3_bucket=infra_config().s3_bucket,
broker_type=str(BrokerType.SERVICEBUS.value)
if infra_config().cloud_provider == "azure"
else str(BrokerType.REDIS.value),
broker_type=service_builder_broker_type,
backend_protocol="abs" if infra_config().cloud_provider == "azure" else "s3",
)

Expand Down