diff --git a/charts/model-engine/templates/endpoint_builder_deployment.yaml b/charts/model-engine/templates/endpoint_builder_deployment.yaml index 273543f5c..2868e87b7 100644 --- a/charts/model-engine/templates/endpoint_builder_deployment.yaml +++ b/charts/model-engine/templates/endpoint_builder_deployment.yaml @@ -56,9 +56,9 @@ spec: - --loglevel=INFO - --concurrency=2 {{- if .Values.serviceIdentifier }} - - --queues=model-engine-{{ .Values.serviceIdentifier }}.service-builder + - --queues=model-engine-{{ .Values.serviceIdentifier }}-service-builder {{- else }} - - --queues=model-engine.service-builder + - --queues=model-engine-service-builder {{- end }} resources: {{- toYaml .Values.resources | nindent 12 }} diff --git a/model-engine/model_engine_server/common/settings.py b/model-engine/model_engine_server/common/settings.py index 7dc6c6bb3..7438844a1 100644 --- a/model-engine/model_engine_server/common/settings.py +++ b/model-engine/model_engine_server/common/settings.py @@ -86,9 +86,9 @@ def get_sync_endpoint_elb_url(deployment_name: str) -> str: def get_service_builder_queue(service_identifier=None): return ( - f"{SERVICE_BUILDER_QUEUE_PREFIX}-{service_identifier}.{SERVICE_BUILDER_QUEUE_SUFFIX}" + f"{SERVICE_BUILDER_QUEUE_PREFIX}-{service_identifier}-{SERVICE_BUILDER_QUEUE_SUFFIX}" if service_identifier - else f"{SERVICE_BUILDER_QUEUE_PREFIX}.{SERVICE_BUILDER_QUEUE_SUFFIX}" + else f"{SERVICE_BUILDER_QUEUE_PREFIX}-{SERVICE_BUILDER_QUEUE_SUFFIX}" ) diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index 6cd8f5af4..c9abea515 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -11,6 +11,7 @@ from model_engine_server.core.config import infra_config from model_engine_server.db.base import SessionAsyncNullPool from model_engine_server.domain.entities import BatchJobSerializationFormat +from model_engine_server.domain.gateways import TaskQueueGateway from model_engine_server.infra.gateways import ( ABSFilesystemGateway, CeleryTaskQueueGateway, @@ -78,17 +79,22 @@ async def run_batch_job( ) resource_gateway = LiveEndpointResourceGateway(queue_delegate=queue_delegate) - model_endpoint_cache_repo = RedisModelEndpointCacheRepository( - redis_client=redis, - ) - inference_task_queue_gateway = ( - servicebus_task_queue_gateway - if infra_config().cloud_provider == "azure" - else sqs_task_queue_gateway - ) + + inference_task_queue_gateway: TaskQueueGateway + infra_task_queue_gateway: TaskQueueGateway + if infra_config().cloud_provider == "azure": + inference_task_queue_gateway = servicebus_task_queue_gateway + infra_task_queue_gateway = servicebus_task_queue_gateway + else: + inference_task_queue_gateway = sqs_task_queue_gateway + infra_task_queue_gateway = sqs_task_queue_gateway + model_endpoint_infra_gateway = LiveModelEndpointInfraGateway( resource_gateway=resource_gateway, - task_queue_gateway=inference_task_queue_gateway, + task_queue_gateway=infra_task_queue_gateway, + ) + model_endpoint_cache_repo = RedisModelEndpointCacheRepository( + redis_client=redis, ) async_model_endpoint_inference_gateway = LiveAsyncModelEndpointInferenceGateway( task_queue_gateway=inference_task_queue_gateway diff --git a/model-engine/model_engine_server/service_builder/celery.py b/model-engine/model_engine_server/service_builder/celery.py index 67cb94b01..06384c9e4 100644 --- a/model-engine/model_engine_server/service_builder/celery.py +++ b/model-engine/model_engine_server/service_builder/celery.py @@ -1,16 +1,23 @@ from model_engine_server.common.dtos.model_endpoints import BrokerType +from model_engine_server.common.env_vars import CIRCLECI from model_engine_server.core.celery import celery_app from model_engine_server.core.config import infra_config +service_builder_broker_type: str +if CIRCLECI: + service_builder_broker_type = str(BrokerType.REDIS.value) +elif infra_config().cloud_provider == "azure": + service_builder_broker_type = str(BrokerType.SERVICEBUS.value) +else: + service_builder_broker_type = str(BrokerType.SQS.value) + service_builder_service = celery_app( name="model_engine_server.service_builder", modules=[ "model_engine_server.service_builder.tasks_v1", ], s3_bucket=infra_config().s3_bucket, - broker_type=str(BrokerType.SERVICEBUS.value) - if infra_config().cloud_provider == "azure" - else str(BrokerType.REDIS.value), + broker_type=service_builder_broker_type, backend_protocol="abs" if infra_config().cloud_provider == "azure" else "s3", )