feat: implement VPA CRD support

- Add CustomObjectsApi integration for VPA resources
- Implement VPA CRUD operations (list, create, delete)
- Add VPA recommendation collection via CRD
- Add API endpoints for VPA management
- Handle VPA installation detection gracefully
- Complete TODO #1: CRD para VPA implementation
This commit is contained in:
2025-10-02 18:50:56 -03:00
parent 213359b18f
commit 64e17eb521
2 changed files with 191 additions and 6 deletions

View File

@@ -1482,6 +1482,59 @@ async def get_workload_historical_details(
logger.error(f"Error getting workload historical details: {str(e)}") logger.error(f"Error getting workload historical details: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error getting workload details: {str(e)}") raise HTTPException(status_code=500, detail=f"Error getting workload details: {str(e)}")
@api_router.get("/vpa/list")
async def list_vpas(
namespace: Optional[str] = None,
k8s_client=Depends(get_k8s_client)
):
"""List VPA resources"""
try:
vpas = await k8s_client.list_vpas(namespace)
return {
"vpas": vpas,
"count": len(vpas),
"namespace": namespace or "all"
}
except Exception as e:
logger.error(f"Error listing VPAs: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.post("/vpa/create")
async def create_vpa(
namespace: str,
vpa_manifest: dict,
k8s_client=Depends(get_k8s_client)
):
"""Create a VPA resource"""
try:
result = await k8s_client.create_vpa(namespace, vpa_manifest)
return {
"message": "VPA created successfully",
"vpa": result,
"namespace": namespace
}
except Exception as e:
logger.error(f"Error creating VPA: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.delete("/vpa/{vpa_name}")
async def delete_vpa(
vpa_name: str,
namespace: str,
k8s_client=Depends(get_k8s_client)
):
"""Delete a VPA resource"""
try:
result = await k8s_client.delete_vpa(vpa_name, namespace)
return {
"message": "VPA deleted successfully",
"vpa_name": vpa_name,
"namespace": namespace
}
except Exception as e:
logger.error(f"Error deleting VPA: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/health") @api_router.get("/health")
async def health_check(): async def health_check():
"""API health check""" """API health check"""

View File

@@ -5,6 +5,7 @@ import logging
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from kubernetes import client, config from kubernetes import client, config
from kubernetes.client.rest import ApiException from kubernetes.client.rest import ApiException
from kubernetes.client import CustomObjectsApi
import asyncio import asyncio
import aiohttp import aiohttp
@@ -20,6 +21,7 @@ class K8sClient:
self.v1 = None self.v1 = None
self.autoscaling_v1 = None self.autoscaling_v1 = None
self.apps_v1 = None self.apps_v1 = None
self.custom_api = None
self.initialized = False self.initialized = False
async def initialize(self): async def initialize(self):
@@ -68,6 +70,7 @@ class K8sClient:
self.v1 = client.CoreV1Api() self.v1 = client.CoreV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api() self.autoscaling_v1 = client.AutoscalingV1Api()
self.apps_v1 = client.AppsV1Api() self.apps_v1 = client.AppsV1Api()
self.custom_api = CustomObjectsApi()
self.initialized = True self.initialized = True
logger.info("Kubernetes client initialized successfully") logger.info("Kubernetes client initialized successfully")
@@ -283,18 +286,147 @@ class K8sClient:
recommendations = [] recommendations = []
try: try:
# VPA is not available in the standard Kubernetes API # VPA uses Custom Resource Definition (CRD)
# TODO: Implement using Custom Resource Definition (CRD) # Check if VPA is installed by trying to list VPAs
logger.warning("VPA is not available in the standard Kubernetes API") vpa_list = self.custom_api.list_cluster_custom_object(
return [] group="autoscaling.k8s.io",
version="v1",
plural="verticalpodautoscalers"
)
for vpa_item in vpa_list.get('items', []):
vpa_name = vpa_item.get('metadata', {}).get('name', 'unknown')
namespace = vpa_item.get('metadata', {}).get('namespace', 'default')
# Extract VPA status and recommendations
status = vpa_item.get('status', {})
recommendation = status.get('recommendation', {})
if recommendation:
# Extract container recommendations
container_recommendations = recommendation.get('containerRecommendations', [])
for container_rec in container_recommendations:
container_name = container_rec.get('containerName', 'unknown')
# Extract CPU and memory recommendations
target_cpu = container_rec.get('target', {}).get('cpu', '0')
target_memory = container_rec.get('target', {}).get('memory', '0')
lower_bound_cpu = container_rec.get('lowerBound', {}).get('cpu', '0')
lower_bound_memory = container_rec.get('lowerBound', {}).get('memory', '0')
upper_bound_cpu = container_rec.get('upperBound', {}).get('cpu', '0')
upper_bound_memory = container_rec.get('upperBound', {}).get('memory', '0')
vpa_rec = VPARecommendation(
vpa_name=vpa_name,
namespace=namespace,
container_name=container_name,
target_cpu=target_cpu,
target_memory=target_memory,
lower_bound_cpu=lower_bound_cpu,
lower_bound_memory=lower_bound_memory,
upper_bound_cpu=upper_bound_cpu,
upper_bound_memory=upper_bound_memory,
uncapped_target_cpu=container_rec.get('uncappedTarget', {}).get('cpu', '0'),
uncapped_target_memory=container_rec.get('uncappedTarget', {}).get('memory', '0')
)
recommendations.append(vpa_rec)
logger.info(f"Collected {len(recommendations)} VPA recommendations") logger.info(f"Collected {len(recommendations)} VPA recommendations")
return recommendations return recommendations
except ApiException as e: except ApiException as e:
logger.error(f"Error collecting VPA recommendations: {e}") if e.status == 404:
# VPA may not be installed, return empty list logger.warning("VPA CRD not found - VPA may not be installed in the cluster")
else:
logger.error(f"Error collecting VPA recommendations: {e}")
return [] return []
except Exception as e:
logger.error(f"Unexpected error collecting VPA recommendations: {e}")
return []
async def list_vpas(self, namespace: str = None) -> List[Dict[str, Any]]:
"""List VPA resources"""
try:
if not self.initialized:
raise RuntimeError("Kubernetes client not initialized")
if namespace:
# List VPAs in specific namespace
vpa_list = self.custom_api.list_namespaced_custom_object(
group="autoscaling.k8s.io",
version="v1",
namespace=namespace,
plural="verticalpodautoscalers"
)
else:
# List all VPAs
vpa_list = self.custom_api.list_cluster_custom_object(
group="autoscaling.k8s.io",
version="v1",
plural="verticalpodautoscalers"
)
return vpa_list.get('items', [])
except ApiException as e:
if e.status == 404:
logger.warning("VPA CRD not found - VPA may not be installed in the cluster")
else:
logger.error(f"Error listing VPAs: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error listing VPAs: {e}")
return []
async def create_vpa(self, namespace: str, vpa_manifest: Dict[str, Any]) -> Dict[str, Any]:
"""Create a VPA resource"""
try:
if not self.initialized:
raise RuntimeError("Kubernetes client not initialized")
# Create VPA using custom object API
result = self.custom_api.create_namespaced_custom_object(
group="autoscaling.k8s.io",
version="v1",
namespace=namespace,
plural="verticalpodautoscalers",
body=vpa_manifest
)
logger.info(f"Successfully created VPA {vpa_manifest.get('metadata', {}).get('name')} in namespace {namespace}")
return result
except ApiException as e:
logger.error(f"Error creating VPA: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error creating VPA: {e}")
raise
async def delete_vpa(self, vpa_name: str, namespace: str) -> Dict[str, Any]:
"""Delete a VPA resource"""
try:
if not self.initialized:
raise RuntimeError("Kubernetes client not initialized")
# Delete VPA using custom object API
result = self.custom_api.delete_namespaced_custom_object(
group="autoscaling.k8s.io",
version="v1",
namespace=namespace,
plural="verticalpodautoscalers",
name=vpa_name
)
logger.info(f"Successfully deleted VPA {vpa_name} from namespace {namespace}")
return result
except ApiException as e:
logger.error(f"Error deleting VPA: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error deleting VPA: {e}")
raise
async def patch_deployment(self, deployment_name: str, namespace: str, patch_body: dict) -> dict: async def patch_deployment(self, deployment_name: str, namespace: str, patch_body: dict) -> dict:
"""Patch a deployment with new configuration""" """Patch a deployment with new configuration"""