2277 lines
88 KiB
Python
2277 lines
88 KiB
Python
"""
|
|
API Routes
|
|
"""
|
|
import logging
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, HTTPException, Depends, Request
|
|
from fastapi.responses import FileResponse
|
|
|
|
from app.models.resource_models import (
|
|
ClusterReport, NamespaceReport, ExportRequest,
|
|
ApplyRecommendationRequest, WorkloadCategory, SmartRecommendation,
|
|
PodHealthScore, SimplifiedValidation
|
|
)
|
|
from app.services.validation_service import ValidationService
|
|
from app.services.report_service import ReportService
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
from app.services.smart_recommendations import SmartRecommendationsService
|
|
from app.core.prometheus_client import PrometheusClient
|
|
from app.core.thanos_client import ThanosClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router
|
|
api_router = APIRouter()
|
|
|
|
# Initialize services
|
|
validation_service = ValidationService()
|
|
report_service = ReportService()
|
|
smart_recommendations_service = SmartRecommendationsService()
|
|
|
|
def get_k8s_client(request: Request):
|
|
"""Dependency to get Kubernetes client"""
|
|
return request.app.state.k8s_client
|
|
|
|
def get_prometheus_client(request: Request):
|
|
"""Dependency to get Prometheus client"""
|
|
return request.app.state.prometheus_client
|
|
|
|
def _extract_workload_name(pod_name: str) -> str:
|
|
"""Extract workload name from pod name (remove replica set suffix)"""
|
|
# Pod names typically follow pattern: workload-name-hash-suffix
|
|
# e.g., resource-governance-798b5579d6-7h298 -> resource-governance
|
|
parts = pod_name.split('-')
|
|
if len(parts) >= 3 and parts[-1].isalnum() and len(parts[-1]) == 5:
|
|
# Remove the last two parts (hash and suffix)
|
|
return '-'.join(parts[:-2])
|
|
elif len(parts) >= 2 and parts[-1].isalnum() and len(parts[-1]) == 5:
|
|
# Remove the last part (suffix)
|
|
return '-'.join(parts[:-1])
|
|
return pod_name
|
|
|
|
@api_router.get("/cluster/status")
|
|
async def get_cluster_status(
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get overall cluster status"""
|
|
try:
|
|
# Collect basic data
|
|
pods = await k8s_client.get_all_pods()
|
|
nodes_info = await k8s_client.get_nodes_info()
|
|
|
|
# Validate resources with historical analysis by workload (more reliable)
|
|
all_validations = []
|
|
|
|
# Group pods by namespace for workload analysis
|
|
namespace_pods = {}
|
|
for pod in pods:
|
|
if pod.namespace not in namespace_pods:
|
|
namespace_pods[pod.namespace] = []
|
|
namespace_pods[pod.namespace].append(pod)
|
|
|
|
# Analyze each namespace's workloads
|
|
for namespace, namespace_pod_list in namespace_pods.items():
|
|
try:
|
|
# Use workload-based analysis (more reliable than individual pods)
|
|
workload_validations = await validation_service.validate_workload_resources_with_historical_analysis(
|
|
namespace_pod_list, "24h"
|
|
)
|
|
all_validations.extend(workload_validations)
|
|
except Exception as e:
|
|
logger.warning(f"Error in workload analysis for namespace {namespace}: {e}")
|
|
# Fallback to individual pod analysis
|
|
for pod in namespace_pod_list:
|
|
try:
|
|
pod_validations = await validation_service.validate_pod_resources_with_historical_analysis(pod, "24h")
|
|
all_validations.extend(pod_validations)
|
|
except Exception as pod_e:
|
|
logger.warning(f"Error in historical analysis for pod {pod.name}: {pod_e}")
|
|
# Final fallback to static validations only
|
|
try:
|
|
static_validations = validation_service.validate_pod_resources(pod)
|
|
all_validations.extend(static_validations)
|
|
except Exception as static_e:
|
|
logger.error(f"Error in static validation for pod {pod.name}: {static_e}")
|
|
|
|
# Get overcommit information
|
|
overcommit_info = await prometheus_client.get_cluster_overcommit()
|
|
|
|
# Get resource utilization information
|
|
resource_utilization_info = await prometheus_client.get_cluster_resource_utilization()
|
|
|
|
# Get VPA recommendations
|
|
vpa_recommendations = await k8s_client.get_vpa_recommendations()
|
|
|
|
# Group pods by namespace for the frontend
|
|
namespaces_data = {}
|
|
pod_validations_map = {}
|
|
|
|
# Create a map of pod validations (static + historical)
|
|
for validation in all_validations:
|
|
pod_key = f"{validation.namespace}/{validation.pod_name}"
|
|
if pod_key not in pod_validations_map:
|
|
pod_validations_map[pod_key] = []
|
|
pod_validations_map[pod_key].append(validation)
|
|
|
|
for pod in pods:
|
|
namespace = pod.namespace
|
|
if namespace not in namespaces_data:
|
|
namespaces_data[namespace] = {
|
|
'namespace': namespace,
|
|
'pods': {},
|
|
'total_validations': 0,
|
|
'severity_breakdown': {'error': 0, 'warning': 0, 'info': 0}
|
|
}
|
|
|
|
# Add pod to namespace
|
|
pod_name = pod.name
|
|
pod_key = f"{namespace}/{pod_name}"
|
|
pod_validations = pod_validations_map.get(pod_key, [])
|
|
|
|
# Convert pod to the format expected by frontend
|
|
pod_data = {
|
|
'pod_name': pod_name,
|
|
'namespace': namespace,
|
|
'phase': pod.phase,
|
|
'node_name': pod.node_name,
|
|
'containers': [],
|
|
'validations': []
|
|
}
|
|
|
|
# Add containers
|
|
for container in pod.containers:
|
|
container_data = {
|
|
'name': container['name'],
|
|
'image': container['image'],
|
|
'resources': container['resources']
|
|
}
|
|
pod_data['containers'].append(container_data)
|
|
|
|
# Add validations for this pod
|
|
for validation in pod_validations:
|
|
validation_data = {
|
|
'rule_name': validation.validation_type,
|
|
'namespace': namespace,
|
|
'message': validation.message,
|
|
'recommendation': validation.recommendation,
|
|
'severity': validation.severity
|
|
}
|
|
pod_data['validations'].append(validation_data)
|
|
|
|
# Update namespace severity breakdown
|
|
namespaces_data[namespace]['severity_breakdown'][validation.severity] += 1
|
|
namespaces_data[namespace]['total_validations'] += 1
|
|
|
|
namespaces_data[namespace]['pods'][pod_name] = pod_data
|
|
|
|
# Convert to list format expected by frontend
|
|
namespaces_list = list(namespaces_data.values())
|
|
|
|
# Count total errors and warnings
|
|
total_errors = sum(ns['severity_breakdown']['error'] for ns in namespaces_list)
|
|
total_warnings = sum(ns['severity_breakdown']['warning'] for ns in namespaces_list)
|
|
|
|
# Process overcommit information
|
|
cpu_overcommit_percent = 0
|
|
memory_overcommit_percent = 0
|
|
namespaces_in_overcommit = 0
|
|
resource_quota_coverage = 0
|
|
|
|
if overcommit_info and overcommit_info.get("cpu") and overcommit_info.get("memory"):
|
|
cpu_capacity = 0
|
|
cpu_requests = 0
|
|
memory_capacity = 0
|
|
memory_requests = 0
|
|
|
|
# Extract CPU data
|
|
if overcommit_info["cpu"].get("capacity", {}).get("status") == "success":
|
|
for result in overcommit_info["cpu"]["capacity"].get("data", {}).get("result", []):
|
|
cpu_capacity += float(result["value"][1])
|
|
|
|
if overcommit_info["cpu"].get("requests", {}).get("status") == "success":
|
|
for result in overcommit_info["cpu"]["requests"].get("data", {}).get("result", []):
|
|
cpu_requests += float(result["value"][1])
|
|
|
|
# Extract Memory data
|
|
if overcommit_info["memory"].get("capacity", {}).get("status") == "success":
|
|
for result in overcommit_info["memory"]["capacity"].get("data", {}).get("result", []):
|
|
memory_capacity += float(result["value"][1])
|
|
|
|
if overcommit_info["memory"].get("requests", {}).get("status") == "success":
|
|
for result in overcommit_info["memory"]["requests"].get("data", {}).get("result", []):
|
|
memory_requests += float(result["value"][1])
|
|
|
|
# Calculate overcommit percentages
|
|
if cpu_capacity > 0:
|
|
cpu_overcommit_percent = round((cpu_requests / cpu_capacity) * 100, 1)
|
|
|
|
if memory_capacity > 0:
|
|
memory_overcommit_percent = round((memory_requests / memory_capacity) * 100, 1)
|
|
|
|
# Debug logging
|
|
logger.info(f"Overcommit Debug - CPU Capacity: {cpu_capacity}, CPU Requests: {cpu_requests}, CPU Overcommit: {cpu_overcommit_percent}%")
|
|
logger.info(f"Overcommit Debug - Memory Capacity: {memory_capacity}, Memory Requests: {memory_requests}, Memory Overcommit: {memory_overcommit_percent}%")
|
|
|
|
# Count namespaces in overcommit (simplified - any namespace with requests > 0)
|
|
namespaces_in_overcommit = len([ns for ns in namespaces_list if ns['total_validations'] > 0])
|
|
|
|
# Calculate resource utilization (usage vs requests) from Prometheus data
|
|
resource_utilization = 0
|
|
if resource_utilization_info.get('data_source') == 'prometheus':
|
|
resource_utilization = resource_utilization_info.get('overall_utilization_percent', 0)
|
|
else:
|
|
# Fallback to simplified calculation if Prometheus data not available
|
|
if cpu_requests > 0 and memory_requests > 0:
|
|
resource_utilization = 75 # Placeholder fallback
|
|
|
|
return {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_pods": len(pods),
|
|
"total_namespaces": len(namespaces_list),
|
|
"total_nodes": len(nodes_info) if nodes_info else 0,
|
|
"total_errors": total_errors,
|
|
"total_warnings": total_warnings,
|
|
"namespaces": namespaces_list,
|
|
"overcommit": {
|
|
"cpu_overcommit_percent": cpu_overcommit_percent,
|
|
"memory_overcommit_percent": memory_overcommit_percent,
|
|
"namespaces_in_overcommit": namespaces_in_overcommit,
|
|
"resource_utilization": resource_utilization,
|
|
"cpu_capacity": cpu_capacity if 'cpu_capacity' in locals() else 0,
|
|
"cpu_requests": cpu_requests if 'cpu_requests' in locals() else 0,
|
|
"memory_capacity": memory_capacity if 'memory_capacity' in locals() else 0,
|
|
"memory_requests": memory_requests if 'memory_requests' in locals() else 0
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting cluster status: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/namespace/{namespace}/status")
|
|
async def get_namespace_status(
|
|
namespace: str,
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get status of a specific namespace"""
|
|
try:
|
|
# Collect namespace data
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
|
|
# Validate resources
|
|
all_validations = []
|
|
for pod in namespace_resources.pods:
|
|
pod_validations = validation_service.validate_pod_resources(pod)
|
|
all_validations.extend(pod_validations)
|
|
|
|
# Get resource usage from Prometheus
|
|
resource_usage = await prometheus_client.get_namespace_resource_usage(namespace)
|
|
|
|
# Generate namespace report
|
|
report = report_service.generate_namespace_report(
|
|
namespace=namespace,
|
|
pods=namespace_resources.pods,
|
|
validations=all_validations,
|
|
resource_usage=resource_usage
|
|
)
|
|
|
|
return report
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting namespace {namespace} status: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/pods")
|
|
async def get_pods(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""List pods with resource information"""
|
|
try:
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
return namespace_resources.pods
|
|
else:
|
|
return await k8s_client.get_all_pods()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing pods: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/validations")
|
|
async def get_validations(
|
|
namespace: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""List resource validations with pagination"""
|
|
try:
|
|
# Collect pods
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Validate resources
|
|
all_validations = []
|
|
for pod in pods:
|
|
pod_validations = validation_service.validate_pod_resources(pod)
|
|
all_validations.extend(pod_validations)
|
|
|
|
# Filter by severity if specified
|
|
if severity:
|
|
all_validations = [
|
|
v for v in all_validations if v.severity == severity
|
|
]
|
|
|
|
# Pagination
|
|
total = len(all_validations)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
paginated_validations = all_validations[start:end]
|
|
|
|
return {
|
|
"validations": paginated_validations,
|
|
"pagination": {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": (total + page_size - 1) // page_size
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting validations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/validations/by-namespace")
|
|
async def get_validations_by_namespace(
|
|
severity: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
include_system_namespaces: bool = False,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""List validations grouped by namespace with pagination"""
|
|
try:
|
|
# Collect all pods with system namespace filter
|
|
pods = await k8s_client.get_all_pods(include_system_namespaces=include_system_namespaces)
|
|
|
|
# Validate resources and group by namespace
|
|
namespace_validations = {}
|
|
for pod in pods:
|
|
pod_validations = validation_service.validate_pod_resources(pod)
|
|
|
|
if pod.namespace not in namespace_validations:
|
|
namespace_validations[pod.namespace] = {
|
|
"namespace": pod.namespace,
|
|
"pods": {},
|
|
"total_validations": 0,
|
|
"severity_breakdown": {"error": 0, "warning": 0, "info": 0, "critical": 0}
|
|
}
|
|
|
|
# Group validations by pod
|
|
if pod.name not in namespace_validations[pod.namespace]["pods"]:
|
|
namespace_validations[pod.namespace]["pods"][pod.name] = {
|
|
"pod_name": pod.name,
|
|
"validations": []
|
|
}
|
|
|
|
# Filter by severity if specified
|
|
if severity:
|
|
pod_validations = [v for v in pod_validations if v.severity == severity]
|
|
|
|
namespace_validations[pod.namespace]["pods"][pod.name]["validations"] = pod_validations
|
|
namespace_validations[pod.namespace]["total_validations"] += len(pod_validations)
|
|
|
|
# Count severities
|
|
for validation in pod_validations:
|
|
severity = validation.severity
|
|
if severity in namespace_validations[pod.namespace]["severity_breakdown"]:
|
|
namespace_validations[pod.namespace]["severity_breakdown"][severity] += 1
|
|
else:
|
|
# Handle unknown severity types
|
|
namespace_validations[pod.namespace]["severity_breakdown"]["info"] += 1
|
|
|
|
# Convert to list and sort by total validations
|
|
namespace_list = list(namespace_validations.values())
|
|
namespace_list.sort(key=lambda x: x["total_validations"], reverse=True)
|
|
|
|
# Pagination
|
|
total = len(namespace_list)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
paginated_namespaces = namespace_list[start:end]
|
|
|
|
return {
|
|
"namespaces": paginated_namespaces,
|
|
"pagination": {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": (total + page_size - 1) // page_size
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting validations by namespace: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/vpa/recommendations")
|
|
async def get_vpa_recommendations(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get VPA recommendations"""
|
|
try:
|
|
recommendations = await k8s_client.get_vpa_recommendations()
|
|
|
|
if namespace:
|
|
recommendations = [
|
|
r for r in recommendations if r.namespace == namespace
|
|
]
|
|
|
|
return recommendations
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting VPA recommendations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/export")
|
|
async def export_report(
|
|
export_request: ExportRequest,
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Export report in different formats"""
|
|
try:
|
|
# Generate report
|
|
pods = await k8s_client.get_all_pods()
|
|
nodes_info = await k8s_client.get_nodes_info()
|
|
|
|
# Filter by namespaces if specified
|
|
if export_request.namespaces:
|
|
pods = [p for p in pods if p.namespace in export_request.namespaces]
|
|
|
|
# Validate resources
|
|
all_validations = []
|
|
for pod in pods:
|
|
pod_validations = validation_service.validate_pod_resources(pod)
|
|
all_validations.extend(pod_validations)
|
|
|
|
# Get additional information
|
|
overcommit_info = {}
|
|
vpa_recommendations = []
|
|
|
|
if export_request.include_vpa:
|
|
vpa_recommendations = await k8s_client.get_vpa_recommendations()
|
|
|
|
if export_request.include_validations:
|
|
overcommit_info = await prometheus_client.get_cluster_overcommit()
|
|
|
|
# Generate report
|
|
report = report_service.generate_cluster_report(
|
|
pods=pods,
|
|
validations=all_validations,
|
|
vpa_recommendations=vpa_recommendations,
|
|
overcommit_info=overcommit_info,
|
|
nodes_info=nodes_info
|
|
)
|
|
|
|
# Export
|
|
filepath = await report_service.export_report(report, export_request)
|
|
|
|
return {
|
|
"message": "Report exported successfully",
|
|
"filepath": filepath,
|
|
"format": export_request.format
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error exporting report: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/export/files")
|
|
async def list_exported_files():
|
|
"""List exported files"""
|
|
try:
|
|
files = report_service.get_exported_reports()
|
|
return files
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing exported files: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/export/files/{filename}")
|
|
async def download_exported_file(filename: str):
|
|
"""Download exported file"""
|
|
try:
|
|
files = report_service.get_exported_reports()
|
|
file_info = next((f for f in files if f["filename"] == filename), None)
|
|
|
|
if not file_info:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
return FileResponse(
|
|
path=file_info["filepath"],
|
|
filename=filename,
|
|
media_type='application/octet-stream'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading file {filename}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/apply/recommendation")
|
|
async def apply_recommendation(
|
|
recommendation: ApplyRecommendationRequest,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Apply resource recommendation"""
|
|
try:
|
|
logger.info(f"Applying recommendation: {recommendation.action} {recommendation.resource_type} = {recommendation.value}")
|
|
|
|
if recommendation.dry_run:
|
|
return {
|
|
"message": "Dry run - recommendation would be applied",
|
|
"pod": recommendation.pod_name,
|
|
"namespace": recommendation.namespace,
|
|
"container": recommendation.container_name,
|
|
"action": f"{recommendation.action} {recommendation.resource_type} = {recommendation.value}"
|
|
}
|
|
else:
|
|
# Apply the recommendation by patching the deployment
|
|
result = await _apply_resource_patch(
|
|
recommendation.pod_name,
|
|
recommendation.namespace,
|
|
recommendation.container_name,
|
|
recommendation.resource_type,
|
|
recommendation.action,
|
|
recommendation.value,
|
|
k8s_client
|
|
)
|
|
|
|
return {
|
|
"message": "Recommendation applied successfully",
|
|
"pod": recommendation.pod_name,
|
|
"namespace": recommendation.namespace,
|
|
"container": recommendation.container_name,
|
|
"action": f"{recommendation.action} {recommendation.resource_type} = {recommendation.value}",
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying recommendation: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/recommendations/apply")
|
|
async def apply_smart_recommendation(
|
|
recommendation: SmartRecommendation,
|
|
dry_run: bool = True,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Apply smart recommendation"""
|
|
try:
|
|
logger.info(f"Applying smart recommendation: {recommendation.title} for {recommendation.workload_name}")
|
|
|
|
if dry_run:
|
|
return {
|
|
"message": "Dry run - recommendation would be applied",
|
|
"workload": recommendation.workload_name,
|
|
"namespace": recommendation.namespace,
|
|
"type": recommendation.recommendation_type,
|
|
"priority": recommendation.priority,
|
|
"title": recommendation.title,
|
|
"description": recommendation.description,
|
|
"implementation_steps": recommendation.implementation_steps,
|
|
"kubectl_commands": recommendation.kubectl_commands,
|
|
"vpa_yaml": recommendation.vpa_yaml
|
|
}
|
|
|
|
# Apply recommendation based on type
|
|
if recommendation.recommendation_type == "vpa_activation":
|
|
result = await _apply_vpa_recommendation(recommendation, k8s_client)
|
|
elif recommendation.recommendation_type == "resource_config":
|
|
result = await _apply_resource_config_recommendation(recommendation, k8s_client)
|
|
elif recommendation.recommendation_type == "ratio_adjustment":
|
|
result = await _apply_ratio_adjustment_recommendation(recommendation, k8s_client)
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Unknown recommendation type: {recommendation.recommendation_type}")
|
|
|
|
return {
|
|
"message": "Smart recommendation applied successfully",
|
|
"workload": recommendation.workload_name,
|
|
"namespace": recommendation.namespace,
|
|
"type": recommendation.recommendation_type,
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying smart recommendation: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
async def _apply_resource_patch(
|
|
pod_name: str,
|
|
namespace: str,
|
|
container_name: str,
|
|
resource_type: str,
|
|
action: str,
|
|
value: str,
|
|
k8s_client
|
|
) -> dict:
|
|
"""Apply resource patch to deployment"""
|
|
try:
|
|
# Get the deployment name from pod name
|
|
deployment_name = _extract_deployment_name(pod_name)
|
|
|
|
# Create patch body
|
|
patch_body = {
|
|
"spec": {
|
|
"template": {
|
|
"spec": {
|
|
"containers": [{
|
|
"name": container_name,
|
|
"resources": {
|
|
action: {
|
|
resource_type: value
|
|
}
|
|
}
|
|
}]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Apply patch
|
|
result = await k8s_client.patch_deployment(deployment_name, namespace, patch_body)
|
|
|
|
return {
|
|
"deployment": deployment_name,
|
|
"namespace": namespace,
|
|
"container": container_name,
|
|
"resource_type": resource_type,
|
|
"action": action,
|
|
"value": value,
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying resource patch: {e}")
|
|
raise
|
|
|
|
async def _apply_vpa_recommendation(recommendation: SmartRecommendation, k8s_client) -> dict:
|
|
"""Apply VPA activation recommendation"""
|
|
try:
|
|
if not recommendation.vpa_yaml:
|
|
raise ValueError("VPA YAML not provided in recommendation")
|
|
|
|
# Apply VPA YAML
|
|
result = await k8s_client.apply_yaml(recommendation.vpa_yaml, recommendation.namespace)
|
|
|
|
return {
|
|
"type": "vpa_activation",
|
|
"workload": recommendation.workload_name,
|
|
"namespace": recommendation.namespace,
|
|
"vpa_yaml_applied": True,
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying VPA recommendation: {e}")
|
|
raise
|
|
|
|
async def _apply_resource_config_recommendation(recommendation: SmartRecommendation, k8s_client) -> dict:
|
|
"""Apply resource configuration recommendation"""
|
|
try:
|
|
# For now, return the kubectl commands that should be executed
|
|
# In a real implementation, these would be executed via the Kubernetes client
|
|
|
|
return {
|
|
"type": "resource_config",
|
|
"workload": recommendation.workload_name,
|
|
"namespace": recommendation.namespace,
|
|
"kubectl_commands": recommendation.kubectl_commands,
|
|
"message": "Resource configuration commands prepared for execution"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying resource config recommendation: {e}")
|
|
raise
|
|
|
|
async def _apply_ratio_adjustment_recommendation(recommendation: SmartRecommendation, k8s_client) -> dict:
|
|
"""Apply ratio adjustment recommendation"""
|
|
try:
|
|
# For now, return the kubectl commands that should be executed
|
|
# In a real implementation, these would be executed via the Kubernetes client
|
|
|
|
return {
|
|
"type": "ratio_adjustment",
|
|
"workload": recommendation.workload_name,
|
|
"namespace": recommendation.namespace,
|
|
"kubectl_commands": recommendation.kubectl_commands,
|
|
"message": "Ratio adjustment commands prepared for execution"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying ratio adjustment recommendation: {e}")
|
|
raise
|
|
|
|
def _extract_deployment_name(pod_name: str) -> str:
|
|
"""Extract deployment name from pod name"""
|
|
# Remove replica set suffix (e.g., "app-74ffb8c66-9kpdg" -> "app")
|
|
parts = pod_name.split('-')
|
|
if len(parts) >= 3 and parts[-2].isalnum() and parts[-1].isalnum():
|
|
return '-'.join(parts[:-2])
|
|
return pod_name
|
|
|
|
@api_router.get("/validations/historical")
|
|
async def get_historical_validations(
|
|
namespace: Optional[str] = None,
|
|
time_range: str = "24h",
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get validations with historical analysis from Prometheus"""
|
|
try:
|
|
validation_service = ValidationService()
|
|
|
|
# Collect pods
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Validate with historical analysis
|
|
all_validations = []
|
|
for pod in pods:
|
|
pod_validations = await validation_service.validate_pod_resources_with_historical_analysis(
|
|
pod, time_range
|
|
)
|
|
all_validations.extend(pod_validations)
|
|
|
|
return {
|
|
"validations": all_validations,
|
|
"total": len(all_validations),
|
|
"time_range": time_range,
|
|
"namespace": namespace or "all"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical validations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/workloads/{namespace}/{workload}/metrics")
|
|
async def get_workload_historical_metrics(
|
|
namespace: str,
|
|
workload: str,
|
|
time_range: str = "24h"
|
|
):
|
|
"""Get historical metrics for a specific workload with cluster percentages"""
|
|
try:
|
|
prometheus_client = PrometheusClient()
|
|
await prometheus_client.initialize()
|
|
|
|
# Get cluster total resources first
|
|
cluster_cpu_query = 'sum(kube_node_status_allocatable{resource="cpu"})'
|
|
cluster_memory_query = 'sum(kube_node_status_allocatable{resource="memory"})'
|
|
|
|
cluster_cpu_data = await prometheus_client.query(cluster_cpu_query)
|
|
cluster_memory_data = await prometheus_client.query(cluster_memory_query)
|
|
|
|
# Extract cluster totals
|
|
cluster_cpu_total = 0
|
|
cluster_memory_total = 0
|
|
|
|
if cluster_cpu_data.get("status") == "success" and cluster_cpu_data.get("data", {}).get("result"):
|
|
for result in cluster_cpu_data["data"]["result"]:
|
|
cluster_cpu_total += float(result["value"][1])
|
|
|
|
if cluster_memory_data.get("status") == "success" and cluster_memory_data.get("data", {}).get("result"):
|
|
for result in cluster_memory_data["data"]["result"]:
|
|
cluster_memory_total += float(result["value"][1])
|
|
|
|
# Get workload-specific metrics using more precise queries
|
|
# CPU usage for specific pod (using regex pattern to match pod name with suffix)
|
|
cpu_usage_query = f'rate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod=~"{workload}.*"}}[5m])'
|
|
memory_usage_query = f'container_memory_working_set_bytes{{namespace="{namespace}", pod=~"{workload}.*", container!="", image!=""}}'
|
|
|
|
# Resource requests and limits for specific pod
|
|
cpu_requests_query = f'sum(kube_pod_container_resource_requests{{namespace="{namespace}", pod=~"{workload}.*", resource="cpu"}})'
|
|
memory_requests_query = f'sum(kube_pod_container_resource_requests{{namespace="{namespace}", pod=~"{workload}.*", resource="memory"}})'
|
|
cpu_limits_query = f'sum(kube_pod_container_resource_limits{{namespace="{namespace}", pod=~"{workload}.*", resource="cpu"}})'
|
|
memory_limits_query = f'sum(kube_pod_container_resource_limits{{namespace="{namespace}", pod=~"{workload}.*", resource="memory"}})'
|
|
|
|
# Execute queries
|
|
cpu_usage_data = await prometheus_client.query(cpu_usage_query)
|
|
memory_usage_data = await prometheus_client.query(memory_usage_query)
|
|
cpu_requests_data = await prometheus_client.query(cpu_requests_query)
|
|
memory_requests_data = await prometheus_client.query(memory_requests_query)
|
|
cpu_limits_data = await prometheus_client.query(cpu_limits_query)
|
|
memory_limits_data = await prometheus_client.query(memory_limits_query)
|
|
|
|
# Extract values
|
|
cpu_usage = 0
|
|
memory_usage = 0
|
|
cpu_requests = 0
|
|
memory_requests = 0
|
|
cpu_limits = 0
|
|
memory_limits = 0
|
|
|
|
# Extract CPU usage
|
|
if cpu_usage_data.get("status") == "success" and cpu_usage_data.get("data", {}).get("result"):
|
|
for result in cpu_usage_data["data"]["result"]:
|
|
cpu_usage += float(result["value"][1])
|
|
|
|
# Extract Memory usage
|
|
if memory_usage_data.get("status") == "success" and memory_usage_data.get("data", {}).get("result"):
|
|
for result in memory_usage_data["data"]["result"]:
|
|
memory_usage += float(result["value"][1])
|
|
|
|
# Extract CPU requests
|
|
if cpu_requests_data.get("status") == "success" and cpu_requests_data.get("data", {}).get("result"):
|
|
for result in cpu_requests_data["data"]["result"]:
|
|
cpu_requests += float(result["value"][1])
|
|
|
|
# Extract Memory requests
|
|
if memory_requests_data.get("status") == "success" and memory_requests_data.get("data", {}).get("result"):
|
|
for result in memory_requests_data["data"]["result"]:
|
|
memory_requests += float(result["value"][1])
|
|
|
|
# Extract CPU limits
|
|
if cpu_limits_data.get("status") == "success" and cpu_limits_data.get("data", {}).get("result"):
|
|
for result in cpu_limits_data["data"]["result"]:
|
|
cpu_limits += float(result["value"][1])
|
|
|
|
# Extract Memory limits
|
|
if memory_limits_data.get("status") == "success" and memory_limits_data.get("data", {}).get("result"):
|
|
for result in memory_limits_data["data"]["result"]:
|
|
memory_limits += float(result["value"][1])
|
|
|
|
# Check if we have real data
|
|
prometheus_available = cluster_cpu_total > 0 and cluster_memory_total > 0
|
|
|
|
# If no real data, return zeros with appropriate message
|
|
if not prometheus_available:
|
|
return {
|
|
"workload": workload,
|
|
"namespace": namespace,
|
|
"time_range": time_range,
|
|
"prometheus_available": False,
|
|
"data_source": "no_data",
|
|
"message": "No metrics data available for this workload",
|
|
"cluster_total": {
|
|
"cpu_cores": 0,
|
|
"memory_bytes": 0,
|
|
"memory_gb": 0
|
|
},
|
|
"workload_metrics": {
|
|
"cpu": {
|
|
"usage_cores": 0,
|
|
"usage_percent": 0,
|
|
"requests_cores": 0,
|
|
"requests_percent": 0,
|
|
"limits_cores": 0,
|
|
"limits_percent": 0,
|
|
"efficiency_percent": 0
|
|
},
|
|
"memory": {
|
|
"usage_bytes": 0,
|
|
"usage_mb": 0,
|
|
"usage_percent": 0,
|
|
"requests_bytes": 0,
|
|
"requests_mb": 0,
|
|
"requests_percent": 0,
|
|
"limits_bytes": 0,
|
|
"limits_mb": 0,
|
|
"limits_percent": 0,
|
|
"efficiency_percent": 0
|
|
}
|
|
}
|
|
}
|
|
|
|
# Calculate percentages
|
|
cpu_usage_percent = (cpu_usage / cluster_cpu_total * 100) if cluster_cpu_total > 0 else 0
|
|
memory_usage_percent = (memory_usage / cluster_memory_total * 100) if cluster_memory_total > 0 else 0
|
|
cpu_requests_percent = (cpu_requests / cluster_cpu_total * 100) if cluster_cpu_total > 0 else 0
|
|
memory_requests_percent = (memory_requests / cluster_memory_total * 100) if cluster_memory_total > 0 else 0
|
|
cpu_limits_percent = (cpu_limits / cluster_cpu_total * 100) if cluster_cpu_total > 0 else 0
|
|
memory_limits_percent = (memory_limits / cluster_memory_total * 100) if cluster_memory_total > 0 else 0
|
|
|
|
# Calculate efficiency (usage vs requests)
|
|
cpu_efficiency = (cpu_usage / cpu_requests * 100) if cpu_requests > 0 else 0
|
|
memory_efficiency = (memory_usage / memory_requests * 100) if memory_requests > 0 else 0
|
|
|
|
return {
|
|
"workload": workload,
|
|
"namespace": namespace,
|
|
"time_range": time_range,
|
|
"prometheus_available": True,
|
|
"data_source": "prometheus",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"cluster_total": {
|
|
"cpu_cores": cluster_cpu_total,
|
|
"memory_bytes": cluster_memory_total,
|
|
"memory_gb": cluster_memory_total / (1024**3)
|
|
},
|
|
"workload_metrics": {
|
|
"cpu": {
|
|
"usage_cores": cpu_usage,
|
|
"usage_percent": round(cpu_usage_percent, 2),
|
|
"requests_cores": cpu_requests,
|
|
"requests_percent": round(cpu_requests_percent, 2),
|
|
"limits_cores": cpu_limits,
|
|
"limits_percent": round(cpu_limits_percent, 2),
|
|
"efficiency_percent": round(cpu_efficiency, 1)
|
|
},
|
|
"memory": {
|
|
"usage_bytes": memory_usage,
|
|
"usage_mb": round(memory_usage / (1024**2), 2),
|
|
"usage_percent": round(memory_usage_percent, 2),
|
|
"requests_bytes": memory_requests,
|
|
"requests_mb": round(memory_requests / (1024**2), 2),
|
|
"requests_percent": round(memory_requests_percent, 2),
|
|
"limits_bytes": memory_limits,
|
|
"limits_mb": round(memory_limits / (1024**2), 2),
|
|
"limits_percent": round(memory_limits_percent, 2),
|
|
"efficiency_percent": round(memory_efficiency, 1)
|
|
}
|
|
},
|
|
"promql_queries": {
|
|
"cluster_cpu_total": cluster_cpu_query,
|
|
"cluster_memory_total": cluster_memory_query,
|
|
"cpu_usage": cpu_usage_query,
|
|
"memory_usage": memory_usage_query,
|
|
"cpu_requests": cpu_requests_query,
|
|
"memory_requests": memory_requests_query,
|
|
"cpu_limits": cpu_limits_query,
|
|
"memory_limits": memory_limits_query
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workload metrics for {namespace}/{workload}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/cluster/historical-summary")
|
|
async def get_cluster_historical_summary(
|
|
time_range: str = "24h"
|
|
):
|
|
"""Get cluster historical summary"""
|
|
try:
|
|
historical_service = HistoricalAnalysisService()
|
|
summary = await historical_service.get_cluster_historical_summary(time_range)
|
|
|
|
return {
|
|
"summary": summary,
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical summary: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/namespace/{namespace}/historical-analysis")
|
|
async def get_namespace_historical_analysis(
|
|
namespace: str,
|
|
time_range: str = "24h",
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get historical analysis for a specific namespace"""
|
|
try:
|
|
historical_service = HistoricalAnalysisService()
|
|
|
|
# Get historical analysis for the namespace
|
|
analysis = await historical_service.get_namespace_historical_analysis(
|
|
namespace, time_range, k8s_client
|
|
)
|
|
|
|
return {
|
|
"namespace": namespace,
|
|
"time_range": time_range,
|
|
"analysis": analysis,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical analysis for namespace {namespace}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/namespace/{namespace}/workload/{workload}/historical-analysis")
|
|
async def get_workload_historical_analysis(
|
|
namespace: str,
|
|
workload: str,
|
|
time_range: str = "24h",
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get historical analysis for a specific workload/deployment"""
|
|
try:
|
|
historical_service = HistoricalAnalysisService()
|
|
|
|
# Get historical analysis for the workload
|
|
analysis = await historical_service.get_workload_historical_analysis(
|
|
namespace, workload, time_range
|
|
)
|
|
|
|
return {
|
|
"namespace": namespace,
|
|
"workload": workload,
|
|
"time_range": time_range,
|
|
"analysis": analysis,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical analysis for workload {workload} in namespace {namespace}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/namespace/{namespace}/pod/{pod_name}/historical-analysis")
|
|
async def get_pod_historical_analysis(
|
|
namespace: str,
|
|
pod_name: str,
|
|
time_range: str = "24h",
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get historical analysis for a specific pod (legacy endpoint)"""
|
|
try:
|
|
historical_service = HistoricalAnalysisService()
|
|
|
|
# Get historical analysis for the pod
|
|
analysis = await historical_service.get_pod_historical_analysis(
|
|
namespace, pod_name, time_range
|
|
)
|
|
|
|
return {
|
|
"namespace": namespace,
|
|
"pod_name": pod_name,
|
|
"time_range": time_range,
|
|
"analysis": analysis,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical analysis for pod {pod_name} in namespace {namespace}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/smart-recommendations")
|
|
async def get_smart_recommendations(
|
|
namespace: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get smart recommendations for workloads"""
|
|
try:
|
|
# Collect pods
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Get workload categories
|
|
categories = await validation_service.get_workload_categories(pods)
|
|
|
|
# Get smart recommendations
|
|
recommendations = await validation_service.get_smart_recommendations(pods)
|
|
|
|
# Filter by priority if specified
|
|
if priority:
|
|
recommendations = [
|
|
r for r in recommendations if r.priority == priority
|
|
]
|
|
|
|
return {
|
|
"recommendations": recommendations,
|
|
"categories": categories,
|
|
"total": len(recommendations)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting smart recommendations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/workload-categories")
|
|
async def get_workload_categories(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get workload categories analysis"""
|
|
try:
|
|
# Collect pods
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Get workload categories
|
|
categories = await validation_service.get_workload_categories(pods)
|
|
|
|
# Group by category
|
|
category_summary = {}
|
|
for category in categories:
|
|
cat_type = category.category
|
|
if cat_type not in category_summary:
|
|
category_summary[cat_type] = {
|
|
"count": 0,
|
|
"total_priority_score": 0,
|
|
"workloads": []
|
|
}
|
|
|
|
category_summary[cat_type]["count"] += 1
|
|
category_summary[cat_type]["total_priority_score"] += category.priority_score
|
|
category_summary[cat_type]["workloads"].append({
|
|
"name": category.workload_name,
|
|
"namespace": category.namespace,
|
|
"priority_score": category.priority_score,
|
|
"estimated_impact": category.estimated_impact,
|
|
"vpa_candidate": category.vpa_candidate
|
|
})
|
|
|
|
# Calculate average priority scores
|
|
for cat_type in category_summary:
|
|
if category_summary[cat_type]["count"] > 0:
|
|
category_summary[cat_type]["average_priority_score"] = (
|
|
category_summary[cat_type]["total_priority_score"] /
|
|
category_summary[cat_type]["count"]
|
|
)
|
|
|
|
return {
|
|
"categories": category_summary,
|
|
"total_workloads": len(categories),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workload categories: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/validations/smart")
|
|
async def get_smart_validations(
|
|
namespace: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
workload_category: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get validations with smart analysis and categorization"""
|
|
try:
|
|
# Collect pods
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Get smart validations
|
|
all_validations = []
|
|
for pod in pods:
|
|
pod_validations = await validation_service.validate_pod_resources_with_smart_analysis(pod)
|
|
all_validations.extend(pod_validations)
|
|
|
|
# Filter by severity if specified
|
|
if severity:
|
|
all_validations = [
|
|
v for v in all_validations if v.severity == severity
|
|
]
|
|
|
|
# Filter by workload category if specified
|
|
if workload_category:
|
|
all_validations = [
|
|
v for v in all_validations if v.workload_category == workload_category
|
|
]
|
|
|
|
# Sort by priority score (descending)
|
|
all_validations.sort(key=lambda x: x.priority_score or 0, reverse=True)
|
|
|
|
# Pagination
|
|
total = len(all_validations)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
paginated_validations = all_validations[start:end]
|
|
|
|
return {
|
|
"validations": paginated_validations,
|
|
"pagination": {
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": (total + page_size - 1) // page_size
|
|
},
|
|
"summary": {
|
|
"total_validations": total,
|
|
"by_severity": {
|
|
"critical": len([v for v in all_validations if v.severity == "critical"]),
|
|
"error": len([v for v in all_validations if v.severity == "error"]),
|
|
"warning": len([v for v in all_validations if v.severity == "warning"]),
|
|
"info": len([v for v in all_validations if v.severity == "info"])
|
|
},
|
|
"by_category": {
|
|
"new": len([v for v in all_validations if v.workload_category == "new"]),
|
|
"established": len([v for v in all_validations if v.workload_category == "established"]),
|
|
"outlier": len([v for v in all_validations if v.workload_category == "outlier"]),
|
|
"compliant": len([v for v in all_validations if v.workload_category == "compliant"])
|
|
}
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting smart validations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/cluster-health")
|
|
async def get_cluster_health(k8s_client=Depends(get_k8s_client)):
|
|
"""Get cluster health overview with overcommit analysis"""
|
|
try:
|
|
pods = await k8s_client.get_all_pods()
|
|
cluster_health = await validation_service.get_cluster_health(pods)
|
|
return cluster_health
|
|
except Exception as e:
|
|
logger.error(f"Error getting cluster health: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/qos-classification")
|
|
async def get_qos_classification(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get QoS classification for pods"""
|
|
try:
|
|
if namespace:
|
|
namespace_resources = await k8s_client.get_namespace_resources(namespace)
|
|
pods = namespace_resources.pods
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
qos_classifications = []
|
|
for pod in pods:
|
|
qos = validation_service.classify_qos(pod)
|
|
qos_classifications.append(qos)
|
|
|
|
return {
|
|
"qos_classifications": qos_classifications,
|
|
"total_pods": len(pods),
|
|
"distribution": {
|
|
"Guaranteed": len([q for q in qos_classifications if q.qos_class == "Guaranteed"]),
|
|
"Burstable": len([q for q in qos_classifications if q.qos_class == "Burstable"]),
|
|
"BestEffort": len([q for q in qos_classifications if q.qos_class == "BestEffort"])
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting QoS classification: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/namespace-distribution")
|
|
async def get_namespace_distribution(
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get resource distribution by namespace for dashboard charts"""
|
|
try:
|
|
# Get all pods
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Group pods by namespace and calculate resource usage
|
|
namespace_resources = {}
|
|
|
|
for pod in pods:
|
|
namespace = pod.namespace
|
|
|
|
if namespace not in namespace_resources:
|
|
namespace_resources[namespace] = {
|
|
'namespace': namespace,
|
|
'cpu_requests': 0.0,
|
|
'memory_requests': 0.0,
|
|
'cpu_limits': 0.0,
|
|
'memory_limits': 0.0,
|
|
'pod_count': 0
|
|
}
|
|
|
|
# Sum up resources from all containers in the pod
|
|
for container in pod.containers:
|
|
resources = container.get('resources', {})
|
|
|
|
# CPU requests and limits
|
|
cpu_req = resources.get('requests', {}).get('cpu', '0')
|
|
cpu_lim = resources.get('limits', {}).get('cpu', '0')
|
|
|
|
# Memory requests and limits
|
|
mem_req = resources.get('requests', {}).get('memory', '0')
|
|
mem_lim = resources.get('limits', {}).get('memory', '0')
|
|
|
|
# Convert to numeric values
|
|
namespace_resources[namespace]['cpu_requests'] += _parse_cpu_value(cpu_req)
|
|
namespace_resources[namespace]['cpu_limits'] += _parse_cpu_value(cpu_lim)
|
|
namespace_resources[namespace]['memory_requests'] += _parse_memory_value(mem_req)
|
|
namespace_resources[namespace]['memory_limits'] += _parse_memory_value(mem_lim)
|
|
|
|
namespace_resources[namespace]['pod_count'] += 1
|
|
|
|
# Convert to list and sort by CPU requests (descending)
|
|
distribution_data = []
|
|
for namespace, data in namespace_resources.items():
|
|
distribution_data.append({
|
|
'namespace': namespace,
|
|
'cpu_requests': data['cpu_requests'],
|
|
'memory_requests': data['memory_requests'],
|
|
'cpu_limits': data['cpu_limits'],
|
|
'memory_limits': data['memory_limits'],
|
|
'pod_count': data['pod_count']
|
|
})
|
|
|
|
# Sort by CPU requests descending
|
|
distribution_data.sort(key=lambda x: x['cpu_requests'], reverse=True)
|
|
|
|
# Take top 10 namespaces and group others
|
|
top_namespaces = distribution_data[:10]
|
|
others_data = distribution_data[10:]
|
|
|
|
# Calculate "Others" total
|
|
others_total = {
|
|
'namespace': 'Others',
|
|
'cpu_requests': sum(ns['cpu_requests'] for ns in others_data),
|
|
'memory_requests': sum(ns['memory_requests'] for ns in others_data),
|
|
'cpu_limits': sum(ns['cpu_limits'] for ns in others_data),
|
|
'memory_limits': sum(ns['memory_limits'] for ns in others_data),
|
|
'pod_count': sum(ns['pod_count'] for ns in others_data)
|
|
}
|
|
|
|
# Add "Others" if there are any
|
|
if others_total['cpu_requests'] > 0 or others_total['memory_requests'] > 0:
|
|
top_namespaces.append(others_total)
|
|
|
|
return {
|
|
'distribution': top_namespaces,
|
|
'total_namespaces': len(distribution_data),
|
|
'total_cpu_requests': sum(ns['cpu_requests'] for ns in distribution_data),
|
|
'total_memory_requests': sum(ns['memory_requests'] for ns in distribution_data)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting namespace distribution: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/overcommit-by-namespace")
|
|
async def get_overcommit_by_namespace(
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get overcommit status by namespace for dashboard charts"""
|
|
try:
|
|
# Get all pods
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Group pods by namespace and calculate resource usage
|
|
namespace_resources = {}
|
|
|
|
for pod in pods:
|
|
namespace = pod.namespace
|
|
|
|
if namespace not in namespace_resources:
|
|
namespace_resources[namespace] = {
|
|
'namespace': namespace,
|
|
'cpu_requests': 0.0,
|
|
'memory_requests': 0.0,
|
|
'cpu_limits': 0.0,
|
|
'memory_limits': 0.0,
|
|
'pod_count': 0
|
|
}
|
|
|
|
# Sum up resources from all containers in the pod
|
|
for container in pod.containers:
|
|
resources = container.get('resources', {})
|
|
|
|
# CPU requests and limits
|
|
cpu_req = resources.get('requests', {}).get('cpu', '0')
|
|
cpu_lim = resources.get('limits', {}).get('cpu', '0')
|
|
|
|
# Memory requests and limits
|
|
mem_req = resources.get('requests', {}).get('memory', '0')
|
|
mem_lim = resources.get('limits', {}).get('memory', '0')
|
|
|
|
# Convert to numeric values
|
|
namespace_resources[namespace]['cpu_requests'] += _parse_cpu_value(cpu_req)
|
|
namespace_resources[namespace]['cpu_limits'] += _parse_cpu_value(cpu_lim)
|
|
namespace_resources[namespace]['memory_requests'] += _parse_memory_value(mem_req)
|
|
namespace_resources[namespace]['memory_limits'] += _parse_memory_value(mem_lim)
|
|
|
|
namespace_resources[namespace]['pod_count'] += 1
|
|
|
|
# Get cluster capacity from Prometheus
|
|
overcommit_info = await prometheus_client.get_cluster_overcommit()
|
|
|
|
# Calculate cluster capacity
|
|
cpu_capacity = 0
|
|
memory_capacity = 0
|
|
|
|
if overcommit_info and overcommit_info.get("cpu") and overcommit_info.get("memory"):
|
|
# Get CPU capacity
|
|
if overcommit_info["cpu"].get("capacity", {}).get("status") == "success":
|
|
for result in overcommit_info["cpu"]["capacity"].get("data", {}).get("result", []):
|
|
cpu_capacity += float(result.get("value", [0, "0"])[1])
|
|
|
|
# Get Memory capacity
|
|
if overcommit_info["memory"].get("capacity", {}).get("status") == "success":
|
|
for result in overcommit_info["memory"]["capacity"].get("data", {}).get("result", []):
|
|
memory_capacity += float(result.get("value", [0, "0"])[1])
|
|
|
|
# Calculate overcommit percentage for each namespace
|
|
overcommit_data = []
|
|
for namespace, data in namespace_resources.items():
|
|
# Calculate CPU overcommit percentage
|
|
cpu_overcommit = 0
|
|
if cpu_capacity > 0:
|
|
cpu_overcommit = (data['cpu_requests'] / cpu_capacity) * 100
|
|
|
|
# Calculate Memory overcommit percentage
|
|
memory_overcommit = 0
|
|
if memory_capacity > 0:
|
|
memory_overcommit = (data['memory_requests'] / memory_capacity) * 100
|
|
|
|
overcommit_data.append({
|
|
'namespace': namespace,
|
|
'cpu_overcommit': round(cpu_overcommit, 1),
|
|
'memory_overcommit': round(memory_overcommit, 1),
|
|
'cpu_requests': data['cpu_requests'],
|
|
'memory_requests': data['memory_requests'],
|
|
'pod_count': data['pod_count']
|
|
})
|
|
|
|
# Sort by CPU overcommit descending
|
|
overcommit_data.sort(key=lambda x: x['cpu_overcommit'], reverse=True)
|
|
|
|
# Take top 10 namespaces
|
|
top_overcommit = overcommit_data[:10]
|
|
|
|
return {
|
|
'overcommit': top_overcommit,
|
|
'total_namespaces': len(overcommit_data),
|
|
'cluster_cpu_capacity': cpu_capacity,
|
|
'cluster_memory_capacity': memory_capacity
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting overcommit by namespace: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
def _parse_cpu_value(cpu_str: str) -> float:
|
|
"""Parse CPU value from string (e.g., '100m' -> 0.1, '1' -> 1.0)"""
|
|
if not cpu_str or cpu_str == '0':
|
|
return 0.0
|
|
|
|
cpu_str = str(cpu_str).strip()
|
|
|
|
if cpu_str.endswith('m'):
|
|
return float(cpu_str[:-1]) / 1000.0
|
|
elif cpu_str.endswith('n'):
|
|
return float(cpu_str[:-1]) / 1000000000.0
|
|
else:
|
|
return float(cpu_str)
|
|
|
|
def _parse_memory_value(mem_str: str) -> float:
|
|
"""Parse memory value from string (e.g., '128Mi' -> 134217728, '1Gi' -> 1073741824)"""
|
|
if not mem_str or mem_str == '0':
|
|
return 0.0
|
|
|
|
mem_str = str(mem_str).strip()
|
|
|
|
if mem_str.endswith('Ki'):
|
|
return float(mem_str[:-2]) * 1024
|
|
elif mem_str.endswith('Mi'):
|
|
return float(mem_str[:-2]) * 1024 * 1024
|
|
elif mem_str.endswith('Gi'):
|
|
return float(mem_str[:-2]) * 1024 * 1024 * 1024
|
|
elif mem_str.endswith('Ti'):
|
|
return float(mem_str[:-2]) * 1024 * 1024 * 1024 * 1024
|
|
elif mem_str.endswith('K'):
|
|
return float(mem_str[:-1]) * 1000
|
|
elif mem_str.endswith('M'):
|
|
return float(mem_str[:-1]) * 1000 * 1000
|
|
elif mem_str.endswith('G'):
|
|
return float(mem_str[:-1]) * 1000 * 1000 * 1000
|
|
elif mem_str.endswith('T'):
|
|
return float(mem_str[:-1]) * 1000 * 1000 * 1000 * 1000
|
|
else:
|
|
return float(mem_str)
|
|
|
|
@api_router.get("/resource-quotas")
|
|
async def get_resource_quotas(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get Resource Quota analysis"""
|
|
try:
|
|
if namespace:
|
|
namespaces = [namespace]
|
|
else:
|
|
pods = await k8s_client.get_all_pods()
|
|
namespaces = list(set(pod.namespace for pod in pods))
|
|
|
|
quotas = await validation_service.analyze_resource_quotas(namespaces)
|
|
|
|
return {
|
|
"resource_quotas": quotas,
|
|
"total_namespaces": len(namespaces),
|
|
"coverage_percentage": len([q for q in quotas if q.status == "Active"]) / len(namespaces) * 100
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting resource quotas: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/pod-health-scores")
|
|
async def get_pod_health_scores(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get simplified pod health scores with grouped validations"""
|
|
try:
|
|
# Get pods
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
if namespace:
|
|
pods = [pod for pod in pods if pod.namespace == namespace]
|
|
|
|
health_scores = []
|
|
|
|
for pod in pods:
|
|
# Get validations for this pod
|
|
pod_validations = validation_service.validate_pod_resources(pod)
|
|
|
|
# Calculate health score
|
|
health_score = validation_service.calculate_pod_health_score(pod, pod_validations)
|
|
health_scores.append(health_score)
|
|
|
|
# Sort by health score (worst first)
|
|
health_scores.sort(key=lambda x: x.health_score)
|
|
|
|
return {
|
|
"pods": health_scores,
|
|
"total_pods": len(health_scores),
|
|
"summary": {
|
|
"excellent": len([h for h in health_scores if h.health_score >= 9]),
|
|
"good": len([h for h in health_scores if 7 <= h.health_score < 9]),
|
|
"medium": len([h for h in health_scores if 5 <= h.health_score < 7]),
|
|
"poor": len([h for h in health_scores if 3 <= h.health_score < 5]),
|
|
"critical": len([h for h in health_scores if h.health_score < 3])
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting pod health scores: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/smart-recommendations")
|
|
async def get_smart_recommendations(
|
|
namespace: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Get smart recommendations for resource optimization"""
|
|
try:
|
|
# Get all pods
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
if namespace:
|
|
pods = [pod for pod in pods if pod.namespace == namespace]
|
|
|
|
# Categorize workloads
|
|
categories = await smart_recommendations_service.categorize_workloads(pods)
|
|
|
|
# Generate smart recommendations
|
|
recommendations = await smart_recommendations_service.generate_smart_recommendations(pods, categories)
|
|
|
|
# Filter by priority if specified
|
|
if priority:
|
|
recommendations = [r for r in recommendations if r.priority == priority]
|
|
|
|
# Group by namespace
|
|
recommendations_by_namespace = {}
|
|
for rec in recommendations:
|
|
if rec.namespace not in recommendations_by_namespace:
|
|
recommendations_by_namespace[rec.namespace] = []
|
|
recommendations_by_namespace[rec.namespace].append(rec)
|
|
|
|
# Calculate summary
|
|
summary = {
|
|
"total_recommendations": len(recommendations),
|
|
"by_priority": {
|
|
"critical": len([r for r in recommendations if r.priority == "critical"]),
|
|
"high": len([r for r in recommendations if r.priority == "high"]),
|
|
"medium": len([r for r in recommendations if r.priority == "medium"]),
|
|
"low": len([r for r in recommendations if r.priority == "low"])
|
|
},
|
|
"by_type": {
|
|
"resource_config": len([r for r in recommendations if r.recommendation_type == "resource_config"]),
|
|
"vpa_activation": len([r for r in recommendations if r.recommendation_type == "vpa_activation"]),
|
|
"ratio_adjustment": len([r for r in recommendations if r.recommendation_type == "ratio_adjustment"])
|
|
},
|
|
"namespaces_affected": len(recommendations_by_namespace)
|
|
}
|
|
|
|
return {
|
|
"recommendations": recommendations,
|
|
"categories": categories,
|
|
"grouped_by_namespace": recommendations_by_namespace,
|
|
"summary": summary,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting smart recommendations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/historical-analysis")
|
|
async def get_historical_analysis(
|
|
time_range: str = "24h",
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get historical analysis for all workloads"""
|
|
try:
|
|
# Get all pods
|
|
pods = await k8s_client.get_all_pods()
|
|
|
|
# Group pods by workload
|
|
workloads = {}
|
|
for pod in pods:
|
|
# Extract workload name from pod name (remove replica set suffix)
|
|
workload_name = _extract_workload_name(pod.name)
|
|
namespace = pod.namespace
|
|
|
|
if workload_name not in workloads:
|
|
workloads[workload_name] = {
|
|
'name': workload_name,
|
|
'namespace': namespace,
|
|
'pods': []
|
|
}
|
|
workloads[workload_name]['pods'].append(pod)
|
|
|
|
# Convert to list and add basic info with real CPU/Memory data
|
|
workload_list = []
|
|
historical_service = HistoricalAnalysisService()
|
|
|
|
for workload_name, workload_data in workloads.items():
|
|
# Get current CPU and Memory usage using OpenShift Console queries
|
|
try:
|
|
cpu_usage = await historical_service.get_workload_cpu_summary(workload_data['namespace'], workload_name)
|
|
memory_usage = await historical_service.get_workload_memory_summary(workload_data['namespace'], workload_name)
|
|
|
|
# Format CPU usage (cores)
|
|
cpu_display = f"{cpu_usage:.3f} cores" if cpu_usage > 0 else "N/A"
|
|
|
|
# Format memory usage (MB)
|
|
memory_display = f"{memory_usage / (1024 * 1024):.1f} MB" if memory_usage > 0 else "N/A"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error getting summary for {workload_name}: {e}")
|
|
cpu_display = "N/A"
|
|
memory_display = "N/A"
|
|
|
|
workload_list.append({
|
|
'name': workload_name,
|
|
'namespace': workload_data['namespace'],
|
|
'pod_count': len(workload_data['pods']),
|
|
'cpu_usage': cpu_display,
|
|
'memory_usage': memory_display,
|
|
'last_updated': datetime.now().isoformat()
|
|
})
|
|
|
|
return {
|
|
"workloads": workload_list,
|
|
"total_workloads": len(workload_list),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical analysis: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Error getting historical analysis: {str(e)}")
|
|
|
|
@api_router.get("/historical-analysis/{namespace}/{workload}")
|
|
async def get_workload_historical_details(
|
|
namespace: str,
|
|
workload: str,
|
|
time_range: str = "24h",
|
|
k8s_client=Depends(get_k8s_client),
|
|
prometheus_client=Depends(get_prometheus_client)
|
|
):
|
|
"""Get detailed historical analysis for a specific workload"""
|
|
try:
|
|
# Get all pods and filter by namespace and workload
|
|
all_pods = await k8s_client.get_all_pods()
|
|
workload_pods = [
|
|
pod for pod in all_pods
|
|
if pod.namespace == namespace and _extract_workload_name(pod.name) == workload
|
|
]
|
|
|
|
if not workload_pods:
|
|
raise HTTPException(status_code=404, detail=f"Workload {workload} not found in namespace {namespace}")
|
|
|
|
# Get historical data from Prometheus
|
|
historical_service = HistoricalAnalysisService()
|
|
|
|
# Get CPU and memory usage over time
|
|
cpu_data = await historical_service.get_cpu_usage_history(namespace, workload, time_range)
|
|
memory_data = await historical_service.get_memory_usage_history(namespace, workload, time_range)
|
|
|
|
# Generate recommendations and get workload summary
|
|
recommendations, workload_summary = await historical_service.generate_recommendations(namespace, workload, time_range)
|
|
|
|
return {
|
|
"workload": workload,
|
|
"namespace": namespace,
|
|
"cpu_data": cpu_data,
|
|
"memory_data": memory_data,
|
|
"recommendations": recommendations,
|
|
"workload_summary": workload_summary,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting workload historical details: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Error getting workload details: {str(e)}")
|
|
|
|
@api_router.get("/vpa/list")
|
|
async def list_vpas(
|
|
namespace: Optional[str] = None,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""List VPA resources"""
|
|
try:
|
|
vpas = await k8s_client.list_vpas(namespace)
|
|
return {
|
|
"vpas": vpas,
|
|
"count": len(vpas),
|
|
"namespace": namespace or "all"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error listing VPAs: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/vpa/create")
|
|
async def create_vpa(
|
|
namespace: str,
|
|
vpa_manifest: dict,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Create a VPA resource"""
|
|
try:
|
|
result = await k8s_client.create_vpa(namespace, vpa_manifest)
|
|
return {
|
|
"message": "VPA created successfully",
|
|
"vpa": result,
|
|
"namespace": namespace
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error creating VPA: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.delete("/vpa/{vpa_name}")
|
|
async def delete_vpa(
|
|
vpa_name: str,
|
|
namespace: str,
|
|
k8s_client=Depends(get_k8s_client)
|
|
):
|
|
"""Delete a VPA resource"""
|
|
try:
|
|
result = await k8s_client.delete_vpa(vpa_name, namespace)
|
|
return {
|
|
"message": "VPA deleted successfully",
|
|
"vpa_name": vpa_name,
|
|
"namespace": namespace
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error deleting VPA: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/health")
|
|
async def health_check():
|
|
"""API health check"""
|
|
return {
|
|
"status": "healthy",
|
|
"service": "resource-governance-api",
|
|
"version": "1.0.0"
|
|
}
|
|
|
|
# ============================================================================
|
|
# OPTIMIZED ENDPOINTS - 10x Performance Improvement
|
|
# ============================================================================
|
|
|
|
@api_router.get("/optimized/workloads/{namespace}/metrics")
|
|
async def get_optimized_workloads_metrics(
|
|
namespace: str,
|
|
time_range: str = "24h"
|
|
):
|
|
"""Get optimized metrics for ALL workloads in namespace using aggregated queries"""
|
|
try:
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
|
|
historical_service = HistoricalAnalysisService()
|
|
workloads_metrics = await historical_service.get_optimized_workloads_metrics(namespace, time_range)
|
|
|
|
return {
|
|
"namespace": namespace,
|
|
"time_range": time_range,
|
|
"workloads_count": len(workloads_metrics),
|
|
"workloads": [
|
|
{
|
|
"workload_name": w.workload_name,
|
|
"cpu_usage_cores": w.cpu_usage_cores,
|
|
"cpu_usage_percent": w.cpu_usage_percent,
|
|
"cpu_requests_cores": w.cpu_requests_cores,
|
|
"cpu_requests_percent": w.cpu_requests_percent,
|
|
"cpu_limits_cores": w.cpu_limits_cores,
|
|
"cpu_limits_percent": w.cpu_limits_percent,
|
|
"memory_usage_mb": w.memory_usage_mb,
|
|
"memory_usage_percent": w.memory_usage_percent,
|
|
"memory_requests_mb": w.memory_requests_mb,
|
|
"memory_requests_percent": w.memory_requests_percent,
|
|
"memory_limits_mb": w.memory_limits_mb,
|
|
"memory_limits_percent": w.memory_limits_percent,
|
|
"cpu_efficiency_percent": w.cpu_efficiency_percent,
|
|
"memory_efficiency_percent": w.memory_efficiency_percent,
|
|
"timestamp": w.timestamp.isoformat()
|
|
}
|
|
for w in workloads_metrics
|
|
],
|
|
"performance_metrics": {
|
|
"optimization_factor": "10x",
|
|
"queries_used": 1, # Single aggregated query
|
|
"cache_enabled": True
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting optimized workload metrics: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/optimized/cluster/totals")
|
|
async def get_optimized_cluster_totals():
|
|
"""Get cluster total resources using optimized query"""
|
|
try:
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
|
|
historical_service = HistoricalAnalysisService()
|
|
cluster_metrics = await historical_service.get_optimized_cluster_totals()
|
|
|
|
return {
|
|
"cpu_cores_total": cluster_metrics.cpu_cores_total,
|
|
"memory_bytes_total": cluster_metrics.memory_bytes_total,
|
|
"memory_gb_total": cluster_metrics.memory_gb_total,
|
|
"performance_metrics": {
|
|
"optimization_factor": "2x",
|
|
"queries_used": 1, # Single aggregated query
|
|
"cache_enabled": True
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting optimized cluster totals: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/optimized/workloads/{namespace}/{workload}/peak-usage")
|
|
async def get_optimized_workload_peak_usage(
|
|
namespace: str,
|
|
workload: str,
|
|
time_range: str = "7d"
|
|
):
|
|
"""Get peak usage for workload using MAX_OVER_TIME"""
|
|
try:
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
|
|
historical_service = HistoricalAnalysisService()
|
|
peak_data = await historical_service.get_optimized_workload_peak_usage(namespace, workload, time_range)
|
|
|
|
return {
|
|
"workload": workload,
|
|
"namespace": namespace,
|
|
"time_range": time_range,
|
|
"peak_usage": peak_data,
|
|
"performance_metrics": {
|
|
"optimization_factor": "5x",
|
|
"queries_used": 2, # MAX_OVER_TIME queries
|
|
"cache_enabled": True
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting optimized peak usage: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/optimized/historical/summary")
|
|
async def get_optimized_historical_summary(
|
|
time_range: str = "24h"
|
|
):
|
|
"""Get optimized historical summary using aggregated queries"""
|
|
try:
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
|
|
historical_service = HistoricalAnalysisService()
|
|
summary = await historical_service.get_optimized_historical_summary(time_range)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting optimized historical summary: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/optimized/cache/stats")
|
|
async def get_cache_statistics():
|
|
"""Get cache statistics for monitoring"""
|
|
try:
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
|
|
historical_service = HistoricalAnalysisService()
|
|
stats = historical_service.get_cache_statistics()
|
|
|
|
return {
|
|
"cache_statistics": stats,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting cache statistics: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
# ============================================================================
|
|
# CELERY BACKGROUND TASKS API
|
|
# ============================================================================
|
|
|
|
@api_router.post("/tasks/cluster/analyze")
|
|
async def start_cluster_analysis():
|
|
"""Start background cluster analysis task"""
|
|
try:
|
|
from app.tasks.cluster_analysis import analyze_cluster
|
|
|
|
# Start background task
|
|
task = analyze_cluster.delay()
|
|
|
|
return {
|
|
"task_id": task.id,
|
|
"status": "started",
|
|
"message": "Cluster analysis started in background",
|
|
"check_status_url": f"/api/v1/tasks/{task.id}/status"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting cluster analysis: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/tasks/namespace/{namespace}/analyze")
|
|
async def start_namespace_analysis(namespace: str):
|
|
"""Start background namespace analysis task"""
|
|
try:
|
|
from app.tasks.cluster_analysis import analyze_namespace
|
|
|
|
# Start background task
|
|
task = analyze_namespace.delay(namespace)
|
|
|
|
return {
|
|
"task_id": task.id,
|
|
"namespace": namespace,
|
|
"status": "started",
|
|
"message": f"Namespace {namespace} analysis started in background",
|
|
"check_status_url": f"/api/v1/tasks/{task.id}/status"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting namespace analysis: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/tasks/historical/{namespace}/{workload}")
|
|
async def start_historical_analysis(namespace: str, workload: str, time_range: str = "24h"):
|
|
"""Start background historical analysis task"""
|
|
try:
|
|
from app.tasks.prometheus_queries import query_historical_data
|
|
|
|
# Start background task
|
|
task = query_historical_data.delay(namespace, workload, time_range)
|
|
|
|
return {
|
|
"task_id": task.id,
|
|
"namespace": namespace,
|
|
"workload": workload,
|
|
"time_range": time_range,
|
|
"status": "started",
|
|
"message": f"Historical analysis for {namespace}/{workload} started in background",
|
|
"check_status_url": f"/api/v1/tasks/{task.id}/status"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting historical analysis: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.post("/tasks/recommendations/generate")
|
|
async def start_recommendations_generation(cluster_data: dict):
|
|
"""Start background smart recommendations generation task"""
|
|
try:
|
|
from app.tasks.recommendations import generate_smart_recommendations
|
|
|
|
# Start background task
|
|
task = generate_smart_recommendations.delay(cluster_data)
|
|
|
|
return {
|
|
"task_id": task.id,
|
|
"status": "started",
|
|
"message": "Smart recommendations generation started in background",
|
|
"check_status_url": f"/api/v1/tasks/{task.id}/status"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting recommendations generation: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/tasks/{task_id}/status")
|
|
async def get_task_status(task_id: str):
|
|
"""Get task status and results"""
|
|
try:
|
|
from app.celery_app import celery_app
|
|
|
|
# Get task result
|
|
result = celery_app.AsyncResult(task_id)
|
|
|
|
if result.state == 'PENDING':
|
|
response = {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'status': 'Task is waiting to be processed...'
|
|
}
|
|
elif result.state == 'PROGRESS':
|
|
response = {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'current': result.info.get('current', 0),
|
|
'total': result.info.get('total', 1),
|
|
'status': result.info.get('status', ''),
|
|
'progress': f"{result.info.get('current', 0)}/{result.info.get('total', 1)}"
|
|
}
|
|
elif result.state == 'SUCCESS':
|
|
response = {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'result': result.result,
|
|
'status': 'Task completed successfully'
|
|
}
|
|
else: # FAILURE
|
|
error_info = result.info
|
|
if isinstance(error_info, dict):
|
|
error_message = error_info.get('error', str(error_info))
|
|
else:
|
|
error_message = str(error_info)
|
|
|
|
response = {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'error': error_message,
|
|
'status': 'Task failed'
|
|
}
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting task status: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/tasks/{task_id}/result")
|
|
async def get_task_result(task_id: str):
|
|
"""Get task result (only if completed)"""
|
|
try:
|
|
from app.celery_app import celery_app
|
|
|
|
# Get task result
|
|
result = celery_app.AsyncResult(task_id)
|
|
|
|
if result.state == 'SUCCESS':
|
|
return {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'result': result.result
|
|
}
|
|
else:
|
|
return {
|
|
'task_id': task_id,
|
|
'state': result.state,
|
|
'message': 'Task not completed yet',
|
|
'check_status_url': f"/api/v1/tasks/{task_id}/status"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting task result: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.delete("/tasks/{task_id}")
|
|
async def cancel_task(task_id: str):
|
|
"""Cancel a running task"""
|
|
try:
|
|
from app.celery_app import celery_app
|
|
|
|
# Revoke task
|
|
celery_app.control.revoke(task_id, terminate=True)
|
|
|
|
return {
|
|
'task_id': task_id,
|
|
'status': 'cancelled',
|
|
'message': 'Task cancelled successfully'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling task: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/tasks/health")
|
|
async def get_celery_health():
|
|
"""Get Celery workers health status"""
|
|
try:
|
|
from app.celery_app import celery_app
|
|
|
|
# Get active workers
|
|
inspect = celery_app.control.inspect()
|
|
active_workers = inspect.active()
|
|
stats = inspect.stats()
|
|
|
|
return {
|
|
'celery_status': 'running',
|
|
'active_workers': len(active_workers) if active_workers else 0,
|
|
'workers': active_workers,
|
|
'stats': stats,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting Celery health: {e}")
|
|
return {
|
|
'celery_status': 'error',
|
|
'error': str(e),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
# ============================================================================
|
|
# HYBRID APIs (Prometheus + Thanos)
|
|
# ============================================================================
|
|
|
|
@api_router.get("/hybrid/resource-trends")
|
|
async def get_resource_trends(days: int = 7):
|
|
"""
|
|
Get resource utilization trends using Thanos for historical data.
|
|
Combines real-time Prometheus data with historical Thanos data.
|
|
"""
|
|
try:
|
|
thanos_client = ThanosClient()
|
|
|
|
# Get historical trends from Thanos
|
|
trends = thanos_client.get_resource_utilization_trend(days)
|
|
|
|
return {
|
|
"data_source": "thanos",
|
|
"period_days": days,
|
|
"trends": trends,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting resource trends: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/hybrid/namespace-trends/{namespace}")
|
|
async def get_namespace_trends(namespace: str, days: int = 7):
|
|
"""
|
|
Get namespace resource trends using Thanos for historical data.
|
|
"""
|
|
try:
|
|
thanos_client = ThanosClient()
|
|
|
|
# Get namespace trends from Thanos
|
|
trends = thanos_client.get_namespace_resource_trends(namespace, days)
|
|
|
|
return {
|
|
"data_source": "thanos",
|
|
"namespace": namespace,
|
|
"period_days": days,
|
|
"trends": trends,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting namespace trends: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/hybrid/overcommit-trends")
|
|
async def get_overcommit_trends(days: int = 7):
|
|
"""
|
|
Get overcommit trends using Thanos for historical data.
|
|
"""
|
|
try:
|
|
thanos_client = ThanosClient()
|
|
|
|
# Get overcommit trends from Thanos
|
|
trends = thanos_client.get_overcommit_historical(days)
|
|
|
|
return {
|
|
"data_source": "thanos",
|
|
"period_days": days,
|
|
"trends": trends,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting overcommit trends: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/hybrid/top-workloads")
|
|
async def get_top_workloads_historical(days: int = 7, limit: int = 10):
|
|
"""
|
|
Get historical top workloads using Thanos.
|
|
"""
|
|
try:
|
|
thanos_client = ThanosClient()
|
|
|
|
# Get top workloads from Thanos
|
|
workloads = thanos_client.get_top_workloads_historical(days, limit)
|
|
|
|
return {
|
|
"data_source": "thanos",
|
|
"period_days": days,
|
|
"limit": limit,
|
|
"workloads": workloads,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting top workloads: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@api_router.get("/hybrid/health")
|
|
async def get_hybrid_health():
|
|
"""
|
|
Get health status of both Prometheus and Thanos.
|
|
"""
|
|
try:
|
|
prometheus_client = PrometheusClient()
|
|
thanos_client = ThanosClient()
|
|
|
|
# Check both services
|
|
prometheus_health = prometheus_client.health_check()
|
|
thanos_health = thanos_client.health_check()
|
|
|
|
return {
|
|
"prometheus": prometheus_health,
|
|
"thanos": thanos_health,
|
|
"overall_status": "healthy" if (
|
|
prometheus_health.get("status") == "healthy" and
|
|
thanos_health.get("status") == "healthy"
|
|
) else "degraded",
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking hybrid health: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|