From 806f03f04fc3a6190813af80d2ed64ac967ed59c Mon Sep 17 00:00:00 2001 From: Yunfeng Bai Date: Mon, 16 Oct 2023 16:30:37 -0700 Subject: [PATCH 1/4] Add pod disruption budget to all endpoints --- .../service_template_config_map.yaml | 13 +++++ .../gateways/resources/image_cache_gateway.py | 3 - .../k8s_endpoint_resource_delegate.py | 55 +++++++++++++++++++ .../gateways/resources/k8s_resource_types.py | 23 +++++++- 4 files changed, 89 insertions(+), 5 deletions(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 8eda04b1a..1df031404 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -515,6 +515,19 @@ data: cpu: ${CPUS} memory: ${MEMORY} controlledResources: ["cpu", "memory"] + pod-disruption-budget.yaml: |- + apiVersion: policy/v1 + kind: PodDisruptionBudget + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + {{- $service_template_labels | nindent 8 }} + spec: + minAvailable: 1 + selector: + matchLabels: + app: ${RESOURCE_NAME} batch-job-orchestration-job.yaml: |- apiVersion: batch/v1 kind: Job diff --git a/model-engine/model_engine_server/infra/gateways/resources/image_cache_gateway.py b/model-engine/model_engine_server/infra/gateways/resources/image_cache_gateway.py index bdd15e27c..fc5a7e54b 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/image_cache_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/resources/image_cache_gateway.py @@ -23,9 +23,6 @@ class CachedImages(TypedDict): t4: List[str] -KUBERNETES_MAX_LENGTH = 64 - - class ImageCacheGateway: async def create_or_update_image_cache(self, cached_images: CachedImages) -> None: """ diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 568365965..e54b9cba2 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -67,6 +67,7 @@ _kubernetes_core_api = None _kubernetes_autoscaling_api = None _kubernetes_batch_api = None +_kubernetes_policy_api = None _kubernetes_custom_objects_api = None _kubernetes_cluster_version = None @@ -147,6 +148,16 @@ def get_kubernetes_batch_client(): # pragma: no cover return _kubernetes_batch_api +def get_kubernetes_policy_client(): # pragma: no cover + if _lazy_load_kubernetes_clients: + global _kubernetes_policy_api + else: + _kubernetes_policy_api = None + if not _kubernetes_policy_api: + _kubernetes_policy_api = kubernetes_asyncio.client.PolicyV1Api() + return _kubernetes_policy_api + + def get_kubernetes_custom_objects_client(): # pragma: no cover if _lazy_load_kubernetes_clients: global _kubernetes_custom_objects_api @@ -599,6 +610,37 @@ async def _create_vpa(vpa: Dict[str, Any], name: str) -> None: logger.exception("Got an exception when trying to apply the VerticalPodAutoscaler") raise + @staticmethod + async def _create_pdb(pdb: Dict[str, Any], name: str) -> None: + """ + Lower-level function to create/patch a k8s PodDisruptionBudget (pdb) + Args: + pdb: PDB body (a nested Dict in the format specified by Kubernetes) + name: The name of the pdb on K8s + + Returns: + Nothing; raises a k8s ApiException if failure + + """ + policy_api = get_kubernetes_policy_client() + try: + await policy_api.create_namespaced_pod_disruption_budget( + namespace=hmi_config.endpoint_namespace, + body=pdb, + ) + except ApiException as exc: + if exc.status == 409: + logger.info(f"PodDisruptionBudget {name} already exists, replacing") + + await policy_api.patch_namespaced_pod_disruption_budget( + name=name, + namespace=hmi_config.endpoint_namespace, + body=pdb, + ) + else: + logger.exception("Got an exception when trying to apply the PodDisruptionBudget") + raise + @staticmethod async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) -> None: custom_objects_api = get_kubernetes_custom_objects_client() @@ -1152,6 +1194,19 @@ async def _create_or_update_resources( name=k8s_resource_group_name, ) + pdb_config_arguments = get_endpoint_resource_arguments_from_request( + k8s_resource_group_name=k8s_resource_group_name, + request=request, + sqs_queue_name=sqs_queue_name_str, + sqs_queue_url=sqs_queue_url_str, + endpoint_resource_name="pod-disruption-budget", + ) + pdb_template = load_k8s_yaml("pod-disruption-budget.yaml", pdb_config_arguments) + await self._create_pdb( + pdb=pdb_template, + name=k8s_resource_group_name, + ) + if model_endpoint_record.endpoint_type in { ModelEndpointType.SYNC, ModelEndpointType.STREAMING, diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 9e77ac687..72b2c196e 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -66,7 +66,7 @@ LAUNCH_HIGH_PRIORITY_CLASS = "model-engine-high-priority" LAUNCH_DEFAULT_PRIORITY_CLASS = "model-engine-default-priority" -KUBERNETES_MAX_LENGTH = 64 +IMAGE_HASH_MAX_LENGTH = 32 FORWARDER_PORT = 5000 USER_CONTAINER_PORT = 5005 ARTIFACT_LIKE_CONTAINER_PORT = FORWARDER_PORT @@ -329,6 +329,12 @@ class VerticalPodAutoscalerArguments(_BaseEndpointArguments): MEMORY: str +class PodDisruptionBudgetArguments(_BaseEndpointArguments): + """Keyword-arguments for substituting into pod disruption budget templates.""" + + pass + + class VirtualServiceArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into virtual-service templates.""" @@ -432,7 +438,7 @@ class VerticalAutoscalingEndpointParams(TypedDict): def compute_image_hash(image: str) -> str: - return str(hashlib.md5(str(image).encode()).hexdigest())[:KUBERNETES_MAX_LENGTH] + return str(hashlib.sha256(str(image).encode()).hexdigest())[:IMAGE_HASH_MAX_LENGTH] def container_start_triton_cmd( @@ -1184,5 +1190,18 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), ) + elif endpoint_resource_name == "pod-disruption-budget": + return PodDisruptionBudgetArguments( + # Base resource arguments + RESOURCE_NAME=k8s_resource_group_name, + NAMESPACE=hmi_config.endpoint_namespace, + ENDPOINT_ID=model_endpoint_record.id, + ENDPOINT_NAME=model_endpoint_record.name, + TEAM=team, + PRODUCT=product, + CREATED_BY=created_by, + OWNER=owner, + GIT_TAG=GIT_TAG, + ) else: raise Exception(f"Unknown resource name: {endpoint_resource_name}") From cc5a24261892ea6a9459d670813ee6c23387ff1d Mon Sep 17 00:00:00 2001 From: Yunfeng Bai Date: Mon, 16 Oct 2023 16:40:20 -0700 Subject: [PATCH 2/4] Delete pdb as well --- .../k8s_endpoint_resource_delegate.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index e54b9cba2..9e129be6e 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1077,6 +1077,27 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool: return False return True + @staticmethod + async def _delete_pdb(endpoint_id: str) -> bool: + policy_client = get_kubernetes_policy_client() + k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id) + try: + await policy_client.delete_namespaced_pod_disruption_budget( + namespace=hmi_config.endpoint_namespace, + name=k8s_resource_group_name, + ) + except ApiException as e: + if e.status == 404: + logger.warning( + f"Trying to delete nonexistent PodDisruptionBudget {k8s_resource_group_name}" + ) + else: + logger.exception( + f"Deletion of PodDisruptionBudget {k8s_resource_group_name} failed" + ) + return False + return True + @staticmethod async def _delete_keda_scaled_object(endpoint_id: str) -> bool: custom_objects_client = get_kubernetes_custom_objects_client() @@ -1616,6 +1637,7 @@ async def _delete_resources_async(self, endpoint_id: str, deployment_name: str) endpoint_id=endpoint_id, deployment_name=deployment_name ) await self._delete_vpa(endpoint_id=endpoint_id) + await self._delete_pdb(endpoint_id=endpoint_id) return deployment_delete_succeeded and config_map_delete_succeeded async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) -> bool: @@ -1637,6 +1659,7 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - endpoint_id=endpoint_id ) await self._delete_vpa(endpoint_id=endpoint_id) + await self._delete_pdb(endpoint_id=endpoint_id) destination_rule_delete_succeeded = await self._delete_destination_rule( endpoint_id=endpoint_id From 1dc2606283cdf0099992aaf6fd326ab3d3422b21 Mon Sep 17 00:00:00 2001 From: Yunfeng Bai Date: Mon, 16 Oct 2023 16:49:07 -0700 Subject: [PATCH 3/4] fix test --- .../service_template_config_map_circleci.yaml | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index e50e16234..606fee3e1 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -2728,6 +2728,31 @@ data: cpu: ${CPUS} memory: ${MEMORY} controlledResources: ["cpu", "memory"] + pod-disruption-budget.yaml: |- + apiVersion: policy/v1 + kind: PodDisruptionBudget + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + spec: + minAvailable: 1 + selector: + matchLabels: + app: ${RESOURCE_NAME} batch-job-orchestration-job.yaml: |- apiVersion: batch/v1 kind: Job From 2ea4e90c0a59fecba17e87bf45e0cfa454f9ee6f Mon Sep 17 00:00:00 2001 From: Yunfeng Bai Date: Mon, 16 Oct 2023 17:38:11 -0700 Subject: [PATCH 4/4] fix tests --- .../test_k8s_endpoint_resource_delegate.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py index 93e2c8e32..acd298b35 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py @@ -69,6 +69,16 @@ def mock_autoscaling_client(): yield mock_client +@pytest.fixture +def mock_policy_client(): + mock_client = AsyncMock() + with patch( + f"{MODULE_PATH}.get_kubernetes_policy_client", + return_value=mock_client, + ): + yield mock_client + + @pytest.fixture def mock_custom_objects_client(): mock_client = AsyncMock() @@ -276,6 +286,7 @@ async def test_create_async_endpoint_has_correct_labels( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, mock_get_kubernetes_cluster_version, create_resources_request_async_runnable_image: CreateOrUpdateResourcesRequest, @@ -323,6 +334,11 @@ async def test_create_async_endpoint_has_correct_labels( ) assert delete_custom_object_call_args_list == [] + # Verify PDB labels + create_pdb_call_args = mock_policy_client.create_namespaced_pod_disruption_budget.call_args + pdb_body = create_pdb_call_args.kwargs["body"] + _verify_non_deployment_labels(pdb_body, request) + if build_endpoint_request.model_endpoint_record.endpoint_type == ModelEndpointType.SYNC: assert create_custom_object_call_args_list == [] _verify_custom_object_plurals( @@ -339,6 +355,7 @@ async def test_create_streaming_endpoint_has_correct_labels( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, mock_get_kubernetes_cluster_version, create_resources_request_streaming_runnable_image: CreateOrUpdateResourcesRequest, @@ -365,6 +382,11 @@ async def test_create_streaming_endpoint_has_correct_labels( config_map_body = create_config_map_call_args.kwargs["body"] _verify_non_deployment_labels(config_map_body, request) + # Verify PDB labels + create_pdb_call_args = mock_policy_client.create_namespaced_pod_disruption_budget.call_args + pdb_body = create_pdb_call_args.kwargs["body"] + _verify_non_deployment_labels(pdb_body, request) + # Verify HPA labels create_hpa_call_args = ( mock_autoscaling_client.create_namespaced_horizontal_pod_autoscaler.call_args @@ -406,6 +428,7 @@ async def test_create_sync_endpoint_has_correct_labels( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, mock_get_kubernetes_cluster_version, create_resources_request_sync_runnable_image: CreateOrUpdateResourcesRequest, @@ -441,6 +464,11 @@ async def test_create_sync_endpoint_has_correct_labels( hpa_body = create_hpa_call_args.kwargs["body"] _verify_non_deployment_labels(hpa_body, request) + # Verify PDB labels + create_pdb_call_args = mock_policy_client.create_namespaced_pod_disruption_budget.call_args + pdb_body = create_pdb_call_args.kwargs["body"] + _verify_non_deployment_labels(pdb_body, request) + # Make sure that an VPA is created if optimize_costs is True. build_endpoint_request = request.build_endpoint_request optimize_costs = build_endpoint_request.optimize_costs @@ -477,6 +505,7 @@ async def test_create_sync_endpoint_has_correct_k8s_service_type( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, mock_get_kubernetes_cluster_version, create_resources_request_sync_runnable_image: CreateOrUpdateResourcesRequest, @@ -531,6 +560,7 @@ async def test_get_resources_async_success( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, ): k8s_endpoint_resource_delegate.__setattr__( @@ -590,6 +620,7 @@ async def test_get_resources_sync_success( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, ): k8s_endpoint_resource_delegate.__setattr__( @@ -653,6 +684,7 @@ async def test_delete_resources_async_success( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, ): deleted = await k8s_endpoint_resource_delegate.delete_resources( @@ -667,6 +699,7 @@ async def test_delete_resources_sync_success( mock_apps_client, mock_core_client, mock_autoscaling_client, + mock_policy_client, mock_custom_objects_client, ): deleted = await k8s_endpoint_resource_delegate.delete_resources(