From 64e17eb521795e95878fa99b2726fa12f347ea8e Mon Sep 17 00:00:00 2001 From: andersonid Date: Thu, 2 Oct 2025 18:50:56 -0300 Subject: [PATCH] feat: implement VPA CRD support - Add CustomObjectsApi integration for VPA resources - Implement VPA CRUD operations (list, create, delete) - Add VPA recommendation collection via CRD - Add API endpoints for VPA management - Handle VPA installation detection gracefully - Complete TODO #1: CRD para VPA implementation --- app/api/routes.py | 53 +++++++++++++ app/core/kubernetes_client.py | 144 ++++++++++++++++++++++++++++++++-- 2 files changed, 191 insertions(+), 6 deletions(-) diff --git a/app/api/routes.py b/app/api/routes.py index 844a297..4173a91 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -1482,6 +1482,59 @@ async def get_workload_historical_details( logger.error(f"Error getting workload historical details: {str(e)}") raise HTTPException(status_code=500, detail=f"Error getting workload details: {str(e)}") +@api_router.get("/vpa/list") +async def list_vpas( + namespace: Optional[str] = None, + k8s_client=Depends(get_k8s_client) +): + """List VPA resources""" + try: + vpas = await k8s_client.list_vpas(namespace) + return { + "vpas": vpas, + "count": len(vpas), + "namespace": namespace or "all" + } + except Exception as e: + logger.error(f"Error listing VPAs: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.post("/vpa/create") +async def create_vpa( + namespace: str, + vpa_manifest: dict, + k8s_client=Depends(get_k8s_client) +): + """Create a VPA resource""" + try: + result = await k8s_client.create_vpa(namespace, vpa_manifest) + return { + "message": "VPA created successfully", + "vpa": result, + "namespace": namespace + } + except Exception as e: + logger.error(f"Error creating VPA: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.delete("/vpa/{vpa_name}") +async def delete_vpa( + vpa_name: str, + namespace: str, + k8s_client=Depends(get_k8s_client) +): + """Delete a VPA resource""" + try: + result = await k8s_client.delete_vpa(vpa_name, namespace) + return { + "message": "VPA deleted successfully", + "vpa_name": vpa_name, + "namespace": namespace + } + except Exception as e: + logger.error(f"Error deleting VPA: {e}") + raise HTTPException(status_code=500, detail=str(e)) + @api_router.get("/health") async def health_check(): """API health check""" diff --git a/app/core/kubernetes_client.py b/app/core/kubernetes_client.py index 122271a..f198092 100644 --- a/app/core/kubernetes_client.py +++ b/app/core/kubernetes_client.py @@ -5,6 +5,7 @@ import logging from typing import List, Dict, Any, Optional from kubernetes import client, config from kubernetes.client.rest import ApiException +from kubernetes.client import CustomObjectsApi import asyncio import aiohttp @@ -20,6 +21,7 @@ class K8sClient: self.v1 = None self.autoscaling_v1 = None self.apps_v1 = None + self.custom_api = None self.initialized = False async def initialize(self): @@ -68,6 +70,7 @@ class K8sClient: self.v1 = client.CoreV1Api() self.autoscaling_v1 = client.AutoscalingV1Api() self.apps_v1 = client.AppsV1Api() + self.custom_api = CustomObjectsApi() self.initialized = True logger.info("Kubernetes client initialized successfully") @@ -283,18 +286,147 @@ class K8sClient: recommendations = [] try: - # VPA is not available in the standard Kubernetes API - # TODO: Implement using Custom Resource Definition (CRD) - logger.warning("VPA is not available in the standard Kubernetes API") - return [] + # VPA uses Custom Resource Definition (CRD) + # Check if VPA is installed by trying to list VPAs + vpa_list = self.custom_api.list_cluster_custom_object( + group="autoscaling.k8s.io", + version="v1", + plural="verticalpodautoscalers" + ) + + for vpa_item in vpa_list.get('items', []): + vpa_name = vpa_item.get('metadata', {}).get('name', 'unknown') + namespace = vpa_item.get('metadata', {}).get('namespace', 'default') + + # Extract VPA status and recommendations + status = vpa_item.get('status', {}) + recommendation = status.get('recommendation', {}) + + if recommendation: + # Extract container recommendations + container_recommendations = recommendation.get('containerRecommendations', []) + for container_rec in container_recommendations: + container_name = container_rec.get('containerName', 'unknown') + + # Extract CPU and memory recommendations + target_cpu = container_rec.get('target', {}).get('cpu', '0') + target_memory = container_rec.get('target', {}).get('memory', '0') + lower_bound_cpu = container_rec.get('lowerBound', {}).get('cpu', '0') + lower_bound_memory = container_rec.get('lowerBound', {}).get('memory', '0') + upper_bound_cpu = container_rec.get('upperBound', {}).get('cpu', '0') + upper_bound_memory = container_rec.get('upperBound', {}).get('memory', '0') + + vpa_rec = VPARecommendation( + vpa_name=vpa_name, + namespace=namespace, + container_name=container_name, + target_cpu=target_cpu, + target_memory=target_memory, + lower_bound_cpu=lower_bound_cpu, + lower_bound_memory=lower_bound_memory, + upper_bound_cpu=upper_bound_cpu, + upper_bound_memory=upper_bound_memory, + uncapped_target_cpu=container_rec.get('uncappedTarget', {}).get('cpu', '0'), + uncapped_target_memory=container_rec.get('uncappedTarget', {}).get('memory', '0') + ) + recommendations.append(vpa_rec) logger.info(f"Collected {len(recommendations)} VPA recommendations") return recommendations except ApiException as e: - logger.error(f"Error collecting VPA recommendations: {e}") - # VPA may not be installed, return empty list + if e.status == 404: + logger.warning("VPA CRD not found - VPA may not be installed in the cluster") + else: + logger.error(f"Error collecting VPA recommendations: {e}") return [] + except Exception as e: + logger.error(f"Unexpected error collecting VPA recommendations: {e}") + return [] + + async def list_vpas(self, namespace: str = None) -> List[Dict[str, Any]]: + """List VPA resources""" + try: + if not self.initialized: + raise RuntimeError("Kubernetes client not initialized") + + if namespace: + # List VPAs in specific namespace + vpa_list = self.custom_api.list_namespaced_custom_object( + group="autoscaling.k8s.io", + version="v1", + namespace=namespace, + plural="verticalpodautoscalers" + ) + else: + # List all VPAs + vpa_list = self.custom_api.list_cluster_custom_object( + group="autoscaling.k8s.io", + version="v1", + plural="verticalpodautoscalers" + ) + + return vpa_list.get('items', []) + + except ApiException as e: + if e.status == 404: + logger.warning("VPA CRD not found - VPA may not be installed in the cluster") + else: + logger.error(f"Error listing VPAs: {e}") + return [] + except Exception as e: + logger.error(f"Unexpected error listing VPAs: {e}") + return [] + + async def create_vpa(self, namespace: str, vpa_manifest: Dict[str, Any]) -> Dict[str, Any]: + """Create a VPA resource""" + try: + if not self.initialized: + raise RuntimeError("Kubernetes client not initialized") + + # Create VPA using custom object API + result = self.custom_api.create_namespaced_custom_object( + group="autoscaling.k8s.io", + version="v1", + namespace=namespace, + plural="verticalpodautoscalers", + body=vpa_manifest + ) + + logger.info(f"Successfully created VPA {vpa_manifest.get('metadata', {}).get('name')} in namespace {namespace}") + return result + + except ApiException as e: + logger.error(f"Error creating VPA: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error creating VPA: {e}") + raise + + async def delete_vpa(self, vpa_name: str, namespace: str) -> Dict[str, Any]: + """Delete a VPA resource""" + try: + if not self.initialized: + raise RuntimeError("Kubernetes client not initialized") + + # Delete VPA using custom object API + result = self.custom_api.delete_namespaced_custom_object( + group="autoscaling.k8s.io", + version="v1", + namespace=namespace, + plural="verticalpodautoscalers", + name=vpa_name + ) + + logger.info(f"Successfully deleted VPA {vpa_name} from namespace {namespace}") + return result + + except ApiException as e: + logger.error(f"Error deleting VPA: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error deleting VPA: {e}") + raise async def patch_deployment(self, deployment_name: str, namespace: str, patch_body: dict) -> dict: """Patch a deployment with new configuration"""