1037 lines
44 KiB
Python
1037 lines
44 KiB
Python
"""
|
|
Resource validation service following Red Hat best practices
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from decimal import Decimal, InvalidOperation
|
|
import re
|
|
|
|
from app.models.resource_models import (
|
|
PodResource,
|
|
ResourceValidation,
|
|
NamespaceResources,
|
|
QoSClassification,
|
|
ResourceQuota,
|
|
ClusterHealth,
|
|
PodHealthScore,
|
|
SimplifiedValidation
|
|
)
|
|
from app.core.config import settings
|
|
from app.services.historical_analysis import HistoricalAnalysisService
|
|
from app.services.smart_recommendations import SmartRecommendationsService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ValidationService:
|
|
"""Service for resource validation"""
|
|
|
|
def __init__(self):
|
|
self.cpu_ratio = settings.cpu_limit_ratio
|
|
self.memory_ratio = settings.memory_limit_ratio
|
|
self.min_cpu_request = settings.min_cpu_request
|
|
self.min_memory_request = settings.min_memory_request
|
|
self.historical_analysis = HistoricalAnalysisService()
|
|
self.smart_recommendations = SmartRecommendationsService()
|
|
|
|
def validate_pod_resources(self, pod: PodResource) -> List[ResourceValidation]:
|
|
"""Validate pod resources"""
|
|
validations = []
|
|
|
|
for container in pod.containers:
|
|
container_validations = self._validate_container_resources(
|
|
pod.name, pod.namespace, container
|
|
)
|
|
validations.extend(container_validations)
|
|
|
|
return validations
|
|
|
|
async def validate_pod_resources_with_historical_analysis(
|
|
self,
|
|
pod: PodResource,
|
|
time_range: str = '24h'
|
|
) -> List[ResourceValidation]:
|
|
"""Validate pod resources including historical analysis"""
|
|
# Static validations
|
|
static_validations = self.validate_pod_resources(pod)
|
|
|
|
# Historical analysis
|
|
try:
|
|
historical_validations = await self.historical_analysis.analyze_pod_historical_usage(
|
|
pod, time_range
|
|
)
|
|
static_validations.extend(historical_validations)
|
|
except Exception as e:
|
|
logger.warning(f"Error in historical analysis for pod {pod.name}: {e}")
|
|
|
|
return static_validations
|
|
|
|
def _validate_container_resources(
|
|
self,
|
|
pod_name: str,
|
|
namespace: str,
|
|
container: Any
|
|
) -> List[ResourceValidation]:
|
|
"""Validate container resources"""
|
|
validations = []
|
|
resources = container["resources"]
|
|
requests = resources.get("requests", {})
|
|
limits = resources.get("limits", {})
|
|
|
|
# Determine QoS class based on Red Hat best practices
|
|
qos_class = self._determine_qos_class(requests, limits)
|
|
|
|
# 1. Check if requests are defined
|
|
if not requests:
|
|
validations.append(ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container["name"],
|
|
validation_type="missing_requests",
|
|
severity="error",
|
|
message="Container without defined requests",
|
|
recommendation="Define CPU and memory requests to guarantee QoS (currently BestEffort class)"
|
|
))
|
|
|
|
# 2. Check if limits are defined
|
|
if not limits:
|
|
validations.append(ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container["name"],
|
|
validation_type="missing_limits",
|
|
severity="warning",
|
|
message="Container without defined limits",
|
|
recommendation="Define limits to avoid excessive resource consumption"
|
|
))
|
|
|
|
# 3. QoS Class validation based on Red Hat recommendations
|
|
qos_validation = self._validate_qos_class(pod_name, namespace, container.name, qos_class, requests, limits)
|
|
if qos_validation:
|
|
validations.append(qos_validation)
|
|
|
|
# 3. Validate limit:request ratio
|
|
if requests and limits:
|
|
cpu_validation = self._validate_cpu_ratio(
|
|
pod_name, namespace, container.name, requests, limits
|
|
)
|
|
if cpu_validation:
|
|
validations.append(cpu_validation)
|
|
|
|
memory_validation = self._validate_memory_ratio(
|
|
pod_name, namespace, container.name, requests, limits
|
|
)
|
|
if memory_validation:
|
|
validations.append(memory_validation)
|
|
|
|
# 4. Add container resource metrics validation
|
|
if requests or limits:
|
|
metrics_validation = self._validate_container_metrics(
|
|
pod_name, namespace, container.name, requests, limits
|
|
)
|
|
if metrics_validation:
|
|
validations.append(metrics_validation)
|
|
|
|
# 5. Validate minimum values
|
|
if requests:
|
|
min_validation = self._validate_minimum_values(
|
|
pod_name, namespace, container.name, requests
|
|
)
|
|
validations.extend(min_validation)
|
|
|
|
return validations
|
|
|
|
def _validate_cpu_ratio(
|
|
self,
|
|
pod_name: str,
|
|
namespace: str,
|
|
container_name: str,
|
|
requests: Dict[str, str],
|
|
limits: Dict[str, str]
|
|
) -> ResourceValidation:
|
|
"""Validate CPU limit:request ratio"""
|
|
if "cpu" not in requests or "cpu" not in limits:
|
|
return None
|
|
|
|
try:
|
|
request_value = self._parse_cpu_value(requests["cpu"])
|
|
limit_value = self._parse_cpu_value(limits["cpu"])
|
|
|
|
if request_value > 0:
|
|
ratio = limit_value / request_value
|
|
|
|
if ratio > self.cpu_ratio: # Sem tolerância excessiva
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="invalid_ratio",
|
|
severity="warning",
|
|
message=f"CPU limit:request ratio too high ({ratio:.2f}:1) - Request: {requests['cpu']}, Limit: {limits['cpu']}",
|
|
recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.cpu_ratio}:1)"
|
|
)
|
|
elif ratio < 1.0:
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="invalid_ratio",
|
|
severity="error",
|
|
message=f"CPU limit less than request ({ratio:.2f}:1) - Request: {requests['cpu']}, Limit: {limits['cpu']}",
|
|
recommendation="CPU limit should be greater than or equal to request"
|
|
)
|
|
|
|
except (ValueError, InvalidOperation) as e:
|
|
logger.warning(f"Error validating CPU ratio: {e}")
|
|
|
|
return None
|
|
|
|
def _validate_memory_ratio(
|
|
self,
|
|
pod_name: str,
|
|
namespace: str,
|
|
container_name: str,
|
|
requests: Dict[str, str],
|
|
limits: Dict[str, str]
|
|
) -> ResourceValidation:
|
|
"""Validate memory limit:request ratio"""
|
|
if "memory" not in requests or "memory" not in limits:
|
|
return None
|
|
|
|
try:
|
|
request_value = self._parse_memory_value(requests["memory"])
|
|
limit_value = self._parse_memory_value(limits["memory"])
|
|
|
|
if request_value > 0:
|
|
ratio = limit_value / request_value
|
|
|
|
if ratio > self.memory_ratio: # Sem tolerância excessiva
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="invalid_ratio",
|
|
severity="warning",
|
|
message=f"Memory limit:request ratio too high ({ratio:.2f}:1) - Request: {requests['memory']}, Limit: {limits['memory']}",
|
|
recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.memory_ratio}:1)"
|
|
)
|
|
elif ratio < 1.0:
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="invalid_ratio",
|
|
severity="error",
|
|
message=f"Memory limit less than request ({ratio:.2f}:1) - Request: {requests['memory']}, Limit: {limits['memory']}",
|
|
recommendation="Memory limit should be greater than or equal to request"
|
|
)
|
|
|
|
except (ValueError, InvalidOperation) as e:
|
|
logger.warning(f"Error validating memory ratio: {e}")
|
|
|
|
return None
|
|
|
|
def _validate_container_metrics(
|
|
self,
|
|
pod_name: str,
|
|
namespace: str,
|
|
container_name: str,
|
|
requests: Dict[str, str],
|
|
limits: Dict[str, str]
|
|
) -> ResourceValidation:
|
|
"""Show container resource metrics and analysis"""
|
|
try:
|
|
# Parse CPU values
|
|
cpu_request = requests.get("cpu", "0")
|
|
cpu_limit = limits.get("cpu", "0")
|
|
cpu_request_parsed = self._parse_cpu_value(cpu_request)
|
|
cpu_limit_parsed = self._parse_cpu_value(cpu_limit)
|
|
|
|
# Parse Memory values
|
|
memory_request = requests.get("memory", "0")
|
|
memory_limit = limits.get("memory", "0")
|
|
memory_request_parsed = self._parse_memory_value(memory_request)
|
|
memory_limit_parsed = self._parse_memory_value(memory_limit)
|
|
|
|
# Calculate ratios
|
|
cpu_ratio = cpu_limit_parsed / cpu_request_parsed if cpu_request_parsed > 0 else 0
|
|
memory_ratio = memory_limit_parsed / memory_request_parsed if memory_request_parsed > 0 else 0
|
|
|
|
# Format values for display
|
|
cpu_request_display = f"{cpu_request_parsed:.1f} cores" if cpu_request_parsed >= 1.0 else f"{cpu_request_parsed * 1000:.0f}m"
|
|
cpu_limit_display = f"{cpu_limit_parsed:.1f} cores" if cpu_limit_parsed >= 1.0 else f"{cpu_limit_parsed * 1000:.0f}m"
|
|
|
|
memory_request_display = f"{memory_request_parsed / (1024*1024*1024):.1f} GiB" if memory_request_parsed >= 1024*1024*1024 else f"{memory_request_parsed / (1024*1024):.0f} MiB"
|
|
memory_limit_display = f"{memory_limit_parsed / (1024*1024*1024):.1f} GiB" if memory_limit_parsed >= 1024*1024*1024 else f"{memory_limit_parsed / (1024*1024):.0f} MiB"
|
|
|
|
# Create detailed message
|
|
message = f"Container Resources - CPU: {cpu_request_display}→{cpu_limit_display} (ratio: {cpu_ratio:.1f}:1), Memory: {memory_request_display}→{memory_limit_display} (ratio: {memory_ratio:.1f}:1)"
|
|
|
|
# Create recommendation based on ratios
|
|
recommendations = []
|
|
if cpu_ratio > self.cpu_ratio:
|
|
recommendations.append(f"CPU ratio {cpu_ratio:.1f}:1 exceeds recommended {self.cpu_ratio}:1")
|
|
if memory_ratio > self.memory_ratio:
|
|
recommendations.append(f"Memory ratio {memory_ratio:.1f}:1 exceeds recommended {self.memory_ratio}:1")
|
|
|
|
recommendation = "; ".join(recommendations) if recommendations else f"Resource allocation within recommended ratios (CPU: {self.cpu_ratio}:1, Memory: {self.memory_ratio}:1)"
|
|
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="container_metrics",
|
|
severity="info",
|
|
message=message,
|
|
recommendation=recommendation
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error validating container metrics: {e}")
|
|
return None
|
|
|
|
def _validate_minimum_values(
|
|
self,
|
|
pod_name: str,
|
|
namespace: str,
|
|
container_name: str,
|
|
requests: Dict[str, str]
|
|
) -> List[ResourceValidation]:
|
|
"""Validate minimum request values"""
|
|
validations = []
|
|
|
|
# Validate minimum CPU
|
|
if "cpu" in requests:
|
|
try:
|
|
request_value = self._parse_cpu_value(requests["cpu"])
|
|
min_value = self._parse_cpu_value(self.min_cpu_request)
|
|
|
|
if request_value < min_value:
|
|
validations.append(ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="minimum_value",
|
|
severity="warning",
|
|
message=f"CPU request too low ({requests['cpu']})",
|
|
recommendation=f"Consider increasing to at least {self.min_cpu_request}"
|
|
))
|
|
except (ValueError, InvalidOperation):
|
|
pass
|
|
|
|
# Validate minimum memory
|
|
if "memory" in requests:
|
|
try:
|
|
request_value = self._parse_memory_value(requests["memory"])
|
|
min_value = self._parse_memory_value(self.min_memory_request)
|
|
|
|
if request_value < min_value:
|
|
validations.append(ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="minimum_value",
|
|
severity="warning",
|
|
message=f"Memory request too low ({requests['memory']})",
|
|
recommendation=f"Consider increasing to at least {self.min_memory_request}"
|
|
))
|
|
except (ValueError, InvalidOperation):
|
|
pass
|
|
|
|
return validations
|
|
|
|
def _parse_cpu_value(self, value: str) -> float:
|
|
"""Convert CPU value to float (cores)"""
|
|
if value.endswith('m'):
|
|
return float(value[:-1]) / 1000
|
|
elif value.endswith('n'):
|
|
return float(value[:-1]) / 1000000000
|
|
else:
|
|
return float(value)
|
|
|
|
def _parse_memory_value(self, value: str) -> int:
|
|
"""Convert memory value to bytes"""
|
|
value = value.upper()
|
|
|
|
if value.endswith('KI'):
|
|
return int(float(value[:-2]) * 1024)
|
|
elif value.endswith('MI'):
|
|
return int(float(value[:-2]) * 1024 * 1024)
|
|
elif value.endswith('GI'):
|
|
return int(float(value[:-2]) * 1024 * 1024 * 1024)
|
|
elif value.endswith('K'):
|
|
return int(float(value[:-1]) * 1000)
|
|
elif value.endswith('M'):
|
|
return int(float(value[:-1]) * 1000 * 1000)
|
|
elif value.endswith('G'):
|
|
return int(float(value[:-1]) * 1000 * 1000 * 1000)
|
|
else:
|
|
return int(value)
|
|
|
|
def _determine_qos_class(self, requests: Dict[str, str], limits: Dict[str, str]) -> str:
|
|
"""Determine QoS class based on requests and limits"""
|
|
cpu_requests = self._parse_cpu_value(requests.get("cpu", "0"))
|
|
memory_requests = self._parse_memory_value(requests.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB
|
|
cpu_limits = self._parse_cpu_value(limits.get("cpu", "0"))
|
|
memory_limits = self._parse_memory_value(limits.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB
|
|
|
|
# Guaranteed: both CPU and memory requests and limits are set and equal
|
|
if (cpu_requests > 0 and memory_requests > 0 and
|
|
cpu_requests == cpu_limits and memory_requests == memory_limits):
|
|
return "Guaranteed"
|
|
|
|
# Burstable: at least one request is set
|
|
elif cpu_requests > 0 or memory_requests > 0:
|
|
return "Burstable"
|
|
|
|
# BestEffort: no requests set
|
|
else:
|
|
return "BestEffort"
|
|
|
|
def _validate_qos_class(self, pod_name: str, namespace: str, container_name: str, qos_class: str, requests: Dict[str, str], limits: Dict[str, str]) -> Optional[ResourceValidation]:
|
|
"""Validate QoS class and provide recommendations"""
|
|
cpu_requests = self._parse_cpu_value(requests.get("cpu", "0"))
|
|
memory_requests = self._parse_memory_value(requests.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB
|
|
cpu_limits = self._parse_cpu_value(limits.get("cpu", "0"))
|
|
memory_limits = self._parse_memory_value(limits.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB
|
|
|
|
# Check for missing requests (BestEffort pods) - removed duplicate validation
|
|
# This is already handled at container level in _validate_container_resources
|
|
|
|
# Check for missing limits (Burstable pods)
|
|
if qos_class == "Burstable" and (cpu_limits == 0 or memory_limits == 0):
|
|
return ResourceValidation(
|
|
pod_name=pod_name,
|
|
namespace=namespace,
|
|
container_name=container_name,
|
|
validation_type="missing_limits",
|
|
severity="warning",
|
|
message="Pod has requests but no limits defined",
|
|
recommendation="Define resource limits to prevent resource starvation",
|
|
priority_score=5,
|
|
workload_category="established",
|
|
estimated_impact="low"
|
|
)
|
|
|
|
return None
|
|
|
|
def validate_namespace_overcommit(
|
|
self,
|
|
namespace_resources: NamespaceResources,
|
|
node_capacity: Dict[str, str]
|
|
) -> List[ResourceValidation]:
|
|
"""Validate overcommit in a namespace"""
|
|
validations = []
|
|
|
|
# Calculate total namespace requests
|
|
total_cpu_requests = self._parse_cpu_value(namespace_resources.total_cpu_requests)
|
|
total_memory_requests = self._parse_memory_value(namespace_resources.total_memory_requests)
|
|
|
|
# Calculate total node capacity
|
|
total_cpu_capacity = self._parse_cpu_value(node_capacity.get("cpu", "0"))
|
|
total_memory_capacity = self._parse_memory_value(node_capacity.get("memory", "0"))
|
|
|
|
# Check CPU overcommit
|
|
if total_cpu_capacity > 0:
|
|
cpu_utilization = (total_cpu_requests / total_cpu_capacity) * 100
|
|
if cpu_utilization > 100:
|
|
validations.append(ResourceValidation(
|
|
pod_name="namespace",
|
|
namespace=namespace_resources.name,
|
|
container_name="all",
|
|
validation_type="overcommit",
|
|
severity="critical",
|
|
message=f"CPU overcommit in namespace: {cpu_utilization:.1f}%",
|
|
recommendation="Reduce CPU requests or add more nodes to the cluster"
|
|
))
|
|
|
|
# Check memory overcommit
|
|
if total_memory_capacity > 0:
|
|
memory_utilization = (total_memory_requests / total_memory_capacity) * 100
|
|
if memory_utilization > 100:
|
|
validations.append(ResourceValidation(
|
|
pod_name="namespace",
|
|
namespace=namespace_resources.name,
|
|
container_name="all",
|
|
validation_type="overcommit",
|
|
severity="critical",
|
|
message=f"Memory overcommit in namespace: {memory_utilization:.1f}%",
|
|
recommendation="Reduce memory requests or add more nodes to the cluster"
|
|
))
|
|
|
|
return validations
|
|
|
|
def generate_recommendations(self, validations: List[ResourceValidation]) -> List[str]:
|
|
"""Generate recommendations based on validations"""
|
|
recommendations = []
|
|
|
|
# Group validations by type
|
|
validation_counts = {}
|
|
for validation in validations:
|
|
validation_type = validation.validation_type
|
|
if validation_type not in validation_counts:
|
|
validation_counts[validation_type] = 0
|
|
validation_counts[validation_type] += 1
|
|
|
|
# Generate recommendations based on found issues
|
|
if validation_counts.get("missing_requests", 0) > 0:
|
|
recommendations.append(
|
|
f"Implement LimitRange in namespace to define default requests "
|
|
f"({validation_counts['missing_requests']} containers without requests)"
|
|
)
|
|
|
|
if validation_counts.get("missing_limits", 0) > 0:
|
|
recommendations.append(
|
|
f"Define limits for {validation_counts['missing_limits']} containers "
|
|
"to avoid excessive resource consumption"
|
|
)
|
|
|
|
if validation_counts.get("invalid_ratio", 0) > 0:
|
|
recommendations.append(
|
|
f"Adjust limit:request ratio for {validation_counts['invalid_ratio']} containers "
|
|
f"(recommended: {self.cpu_ratio}:1)"
|
|
)
|
|
|
|
if validation_counts.get("overcommit", 0) > 0:
|
|
recommendations.append(
|
|
f"Resolve overcommit in {validation_counts['overcommit']} namespaces "
|
|
"to avoid performance issues"
|
|
)
|
|
|
|
return recommendations
|
|
|
|
async def validate_pod_resources_with_categorization(
|
|
self,
|
|
pod: PodResource,
|
|
workload_category: str = None,
|
|
priority_score: int = None
|
|
) -> List[ResourceValidation]:
|
|
"""Validate pod resources with enhanced categorization and scoring"""
|
|
validations = self.validate_pod_resources(pod)
|
|
|
|
# Add categorization and scoring to validations
|
|
for validation in validations:
|
|
validation.workload_category = workload_category
|
|
validation.priority_score = priority_score or self._calculate_priority_score(validation)
|
|
validation.estimated_impact = self._determine_impact(validation.priority_score)
|
|
|
|
return validations
|
|
|
|
async def validate_pod_resources_with_smart_analysis(
|
|
self,
|
|
pod: PodResource,
|
|
time_range: str = '24h'
|
|
) -> List[ResourceValidation]:
|
|
"""Validate pod resources with smart analysis including historical data"""
|
|
# Static validations
|
|
static_validations = self.validate_pod_resources(pod)
|
|
|
|
# Get workload category
|
|
workload_category = await self._categorize_workload(pod)
|
|
|
|
# Get smart recommendations
|
|
smart_recommendations = await self.smart_recommendations.generate_smart_recommendations([pod], [workload_category])
|
|
|
|
# Enhance validations with smart analysis
|
|
enhanced_validations = []
|
|
for validation in static_validations:
|
|
validation.workload_category = workload_category.category
|
|
validation.priority_score = self._calculate_priority_score(validation)
|
|
validation.estimated_impact = self._determine_impact(validation.priority_score)
|
|
enhanced_validations.append(validation)
|
|
|
|
# Add smart recommendations as validations
|
|
for recommendation in smart_recommendations:
|
|
smart_validation = ResourceValidation(
|
|
pod_name=pod.name,
|
|
namespace=pod.namespace,
|
|
container_name="workload",
|
|
validation_type="smart_recommendation",
|
|
severity=recommendation.priority,
|
|
message=recommendation.title,
|
|
recommendation=recommendation.description,
|
|
priority_score=self._get_priority_score_from_string(recommendation.priority),
|
|
workload_category=workload_category.category,
|
|
estimated_impact=recommendation.estimated_impact
|
|
)
|
|
enhanced_validations.append(smart_validation)
|
|
|
|
return enhanced_validations
|
|
|
|
async def _categorize_workload(self, pod: PodResource) -> Any:
|
|
"""Categorize a single workload"""
|
|
categories = await self.smart_recommendations.categorize_workloads([pod])
|
|
return categories[0] if categories else None
|
|
|
|
def _get_priority_score_from_string(self, priority: str) -> int:
|
|
"""Convert priority string to numeric score"""
|
|
priority_map = {
|
|
"critical": 10,
|
|
"high": 8,
|
|
"medium": 5,
|
|
"low": 2
|
|
}
|
|
return priority_map.get(priority, 5)
|
|
|
|
def _calculate_priority_score(self, validation: ResourceValidation) -> int:
|
|
"""Calculate priority score for validation (1-10)"""
|
|
score = 1
|
|
|
|
# Base score by severity
|
|
if validation.severity == "critical":
|
|
score += 4
|
|
elif validation.severity == "error":
|
|
score += 3
|
|
elif validation.severity == "warning":
|
|
score += 1
|
|
|
|
# Add score by validation type
|
|
if validation.validation_type == "missing_requests":
|
|
score += 3
|
|
elif validation.validation_type == "missing_limits":
|
|
score += 2
|
|
elif validation.validation_type == "invalid_ratio":
|
|
score += 1
|
|
elif validation.validation_type == "overcommit":
|
|
score += 4
|
|
|
|
# Add score for production namespaces
|
|
if validation.namespace in ["default", "production", "prod"]:
|
|
score += 2
|
|
|
|
return min(score, 10)
|
|
|
|
def _determine_impact(self, priority_score: int) -> str:
|
|
"""Determine estimated impact based on priority score"""
|
|
if priority_score >= 8:
|
|
return "critical"
|
|
elif priority_score >= 6:
|
|
return "high"
|
|
elif priority_score >= 4:
|
|
return "medium"
|
|
else:
|
|
return "low"
|
|
|
|
async def get_workload_categories(self, pods: List[PodResource]) -> List[Any]:
|
|
"""Get workload categories for all pods"""
|
|
return await self.smart_recommendations.categorize_workloads(pods)
|
|
|
|
async def get_smart_recommendations(self, pods: List[PodResource]) -> List[Any]:
|
|
"""Get smart recommendations for all workloads"""
|
|
categories = await self.get_workload_categories(pods)
|
|
return await self.smart_recommendations.generate_smart_recommendations(pods, categories)
|
|
|
|
def classify_qos(self, pod: PodResource) -> QoSClassification:
|
|
"""Classify pod QoS based on Red Hat best practices"""
|
|
cpu_requests = pod.cpu_requests
|
|
memory_requests = pod.memory_requests
|
|
cpu_limits = pod.cpu_limits
|
|
memory_limits = pod.memory_limits
|
|
|
|
# Determine QoS class
|
|
if (cpu_requests > 0 and memory_requests > 0 and
|
|
cpu_limits > 0 and memory_limits > 0 and
|
|
cpu_requests == cpu_limits and memory_requests == memory_limits):
|
|
qos_class = "Guaranteed"
|
|
efficiency_score = 1.0
|
|
elif (cpu_requests > 0 or memory_requests > 0):
|
|
qos_class = "Burstable"
|
|
# Calculate efficiency based on request/limit ratio
|
|
cpu_efficiency = cpu_requests / cpu_limits if cpu_limits > 0 else 0.5
|
|
memory_efficiency = memory_requests / memory_limits if memory_limits > 0 else 0.5
|
|
efficiency_score = (cpu_efficiency + memory_efficiency) / 2
|
|
else:
|
|
qos_class = "BestEffort"
|
|
efficiency_score = 0.0
|
|
|
|
# Generate recommendation
|
|
recommendation = None
|
|
if qos_class == "BestEffort":
|
|
recommendation = "Define CPU and memory requests for better resource management"
|
|
elif qos_class == "Burstable" and efficiency_score < 0.3:
|
|
recommendation = "Consider setting limits closer to requests for better predictability"
|
|
elif qos_class == "Guaranteed":
|
|
recommendation = "Optimal QoS configuration for production workloads"
|
|
|
|
return QoSClassification(
|
|
pod_name=pod.name,
|
|
namespace=pod.namespace,
|
|
qos_class=qos_class,
|
|
cpu_requests=cpu_requests,
|
|
memory_requests=memory_requests,
|
|
cpu_limits=cpu_limits,
|
|
memory_limits=memory_limits,
|
|
efficiency_score=efficiency_score,
|
|
recommendation=recommendation
|
|
)
|
|
|
|
async def analyze_resource_quotas(self, namespaces: List[str]) -> List[ResourceQuota]:
|
|
"""Analyze Resource Quotas for namespaces"""
|
|
quotas = []
|
|
|
|
for namespace in namespaces:
|
|
# This would typically query the Kubernetes API
|
|
# For now, we'll simulate the analysis
|
|
quota = ResourceQuota(
|
|
namespace=namespace,
|
|
name=f"quota-{namespace}",
|
|
status="Missing", # Would be determined by API call
|
|
usage_percentage=0.0,
|
|
recommended_quota={
|
|
"cpu": "2000m",
|
|
"memory": "8Gi",
|
|
"pods": "20"
|
|
}
|
|
)
|
|
quotas.append(quota)
|
|
|
|
return quotas
|
|
|
|
async def _get_cluster_capacity(self) -> tuple[float, float, int]:
|
|
"""Get real cluster capacity from nodes"""
|
|
try:
|
|
from kubernetes import client
|
|
v1 = client.CoreV1Api()
|
|
nodes = v1.list_node()
|
|
|
|
total_cpu_cores = 0.0
|
|
total_memory_bytes = 0.0
|
|
total_nodes = len(nodes.items)
|
|
|
|
for node in nodes.items:
|
|
# Parse CPU capacity
|
|
cpu_capacity = node.status.capacity.get("cpu", "0")
|
|
total_cpu_cores += self._parse_cpu_value(cpu_capacity)
|
|
|
|
# Parse Memory capacity
|
|
memory_capacity = node.status.capacity.get("memory", "0")
|
|
total_memory_bytes += self._parse_memory_value(memory_capacity)
|
|
|
|
# Convert memory to GiB
|
|
total_memory_gib = total_memory_bytes / (1024 * 1024 * 1024)
|
|
|
|
return total_cpu_cores, total_memory_gib, total_nodes
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not get real cluster capacity: {e}. Using fallback values.")
|
|
# Fallback values based on typical OpenShift cluster
|
|
return 24.0, 70.0, 6
|
|
|
|
async def get_cluster_health(self, pods: List[PodResource]) -> ClusterHealth:
|
|
"""Get cluster health overview with overcommit analysis"""
|
|
total_pods = len(pods)
|
|
total_namespaces = len(set(pod.namespace for pod in pods))
|
|
|
|
# Calculate cluster resource totals
|
|
cluster_cpu_requests = sum(pod.cpu_requests for pod in pods)
|
|
cluster_memory_requests = sum(pod.memory_requests for pod in pods)
|
|
cluster_cpu_limits = sum(pod.cpu_limits for pod in pods)
|
|
cluster_memory_limits = sum(pod.memory_limits for pod in pods)
|
|
|
|
# Get real cluster capacity
|
|
cluster_cpu_capacity, cluster_memory_capacity, total_nodes = await self._get_cluster_capacity()
|
|
|
|
# Calculate overcommit percentages
|
|
cpu_overcommit = (cluster_cpu_requests / cluster_cpu_capacity) * 100
|
|
# Convert memory capacity from GiB to bytes for consistent calculation
|
|
cluster_memory_capacity_bytes = cluster_memory_capacity * (1024 * 1024 * 1024)
|
|
memory_overcommit = (cluster_memory_requests / cluster_memory_capacity_bytes) * 100
|
|
|
|
# Determine overall health
|
|
if cpu_overcommit > 150 or memory_overcommit > 150:
|
|
overall_health = "Critical"
|
|
elif cpu_overcommit > 120 or memory_overcommit > 120:
|
|
overall_health = "Warning"
|
|
else:
|
|
overall_health = "Healthy"
|
|
|
|
# Count critical issues
|
|
critical_issues = sum(1 for pod in pods if pod.cpu_requests == 0 or pod.memory_requests == 0)
|
|
|
|
# Get top resource consumers
|
|
top_consumers = sorted(
|
|
pods,
|
|
key=lambda p: p.cpu_requests + p.memory_requests,
|
|
reverse=True
|
|
)[:10]
|
|
|
|
# QoS distribution
|
|
qos_distribution = {"Guaranteed": 0, "Burstable": 0, "BestEffort": 0}
|
|
for pod in pods:
|
|
qos = self.classify_qos(pod)
|
|
qos_distribution[qos.qos_class] += 1
|
|
|
|
return ClusterHealth(
|
|
total_pods=total_pods,
|
|
total_namespaces=total_namespaces,
|
|
total_nodes=total_nodes,
|
|
cluster_cpu_capacity=cluster_cpu_capacity,
|
|
cluster_memory_capacity=cluster_memory_capacity,
|
|
cluster_cpu_requests=cluster_cpu_requests,
|
|
cluster_memory_requests=cluster_memory_requests,
|
|
cluster_cpu_limits=cluster_cpu_limits,
|
|
cluster_memory_limits=cluster_memory_limits,
|
|
cpu_overcommit_percentage=cpu_overcommit,
|
|
memory_overcommit_percentage=memory_overcommit,
|
|
overall_health=overall_health,
|
|
critical_issues=critical_issues,
|
|
namespaces_in_overcommit=len([ns for ns in set(pod.namespace for pod in pods) if self._is_namespace_in_overcommit(ns, pods)]),
|
|
top_resource_consumers=[
|
|
{
|
|
"name": pod.name,
|
|
"namespace": pod.namespace,
|
|
"cpu_requests": pod.cpu_requests,
|
|
"memory_requests": pod.memory_requests,
|
|
"qos_class": self.classify_qos(pod).qos_class
|
|
}
|
|
for pod in top_consumers
|
|
],
|
|
qos_distribution=qos_distribution,
|
|
resource_quota_coverage=self._calculate_resource_quota_coverage(pods)
|
|
)
|
|
|
|
def _is_namespace_in_overcommit(self, namespace: str, pods: List[PodResource]) -> bool:
|
|
"""Check if namespace is in overcommit"""
|
|
namespace_pods = [pod for pod in pods if pod.namespace == namespace]
|
|
if not namespace_pods:
|
|
return False
|
|
|
|
# Simple overcommit check: if any pod has limits > requests
|
|
for pod in namespace_pods:
|
|
if pod.cpu_limits > pod.cpu_requests or pod.memory_limits > pod.memory_requests:
|
|
return True
|
|
return False
|
|
|
|
def _calculate_resource_quota_coverage(self, pods: List[PodResource]) -> float:
|
|
"""Calculate resource quota coverage percentage"""
|
|
namespaces = set(pod.namespace for pod in pods)
|
|
if not namespaces:
|
|
return 0.0
|
|
|
|
# For now, return a simple calculation based on namespace count
|
|
# In a real implementation, this would check actual ResourceQuota objects
|
|
return min(len(namespaces) * 0.2, 1.0) # 20% per namespace, max 100%
|
|
|
|
def calculate_pod_health_score(self, pod: PodResource, validations: List[ResourceValidation]) -> PodHealthScore:
|
|
"""Calculate pod health score and create simplified display"""
|
|
# Calculate health score (0-10)
|
|
health_score = 10
|
|
|
|
# Deduct points for issues
|
|
for validation in validations:
|
|
if validation.severity == "critical":
|
|
health_score -= 3
|
|
elif validation.severity == "error":
|
|
health_score -= 2
|
|
elif validation.severity == "warning":
|
|
health_score -= 1
|
|
|
|
# Ensure score is between 0-10
|
|
health_score = max(0, min(10, health_score))
|
|
|
|
# Determine health status and visual indicators
|
|
if health_score >= 9:
|
|
health_status = "Excellent"
|
|
status_color = "green"
|
|
status_icon = "✅"
|
|
elif health_score >= 7:
|
|
health_status = "Good"
|
|
status_color = "green"
|
|
status_icon = "✅"
|
|
elif health_score >= 5:
|
|
health_status = "Medium"
|
|
status_color = "yellow"
|
|
status_icon = "🟡"
|
|
elif health_score >= 3:
|
|
health_status = "Poor"
|
|
status_color = "orange"
|
|
status_icon = "🟠"
|
|
else:
|
|
health_status = "Critical"
|
|
status_color = "red"
|
|
status_icon = "🔴"
|
|
|
|
# Create simplified resource display
|
|
cpu_display, cpu_status = self._create_cpu_display(pod)
|
|
memory_display, memory_status = self._create_memory_display(pod)
|
|
|
|
# Group validations by severity
|
|
critical_issues = []
|
|
warnings = []
|
|
info_items = []
|
|
|
|
for validation in validations:
|
|
if validation.severity == "critical":
|
|
critical_issues.append(validation.message)
|
|
elif validation.severity in ["error", "warning"]:
|
|
warnings.append(validation.message)
|
|
else:
|
|
info_items.append(validation.message)
|
|
|
|
# Determine available actions
|
|
available_actions = self._determine_available_actions(validations)
|
|
oc_commands = self._generate_oc_commands(pod, validations)
|
|
|
|
return PodHealthScore(
|
|
pod_name=pod.name,
|
|
namespace=pod.namespace,
|
|
health_score=health_score,
|
|
health_status=health_status,
|
|
status_color=status_color,
|
|
status_icon=status_icon,
|
|
cpu_display=cpu_display,
|
|
memory_display=memory_display,
|
|
cpu_status=cpu_status,
|
|
memory_status=memory_status,
|
|
critical_issues=critical_issues,
|
|
warnings=warnings,
|
|
info_items=info_items,
|
|
available_actions=available_actions,
|
|
oc_commands=oc_commands
|
|
)
|
|
|
|
def _create_cpu_display(self, pod: PodResource) -> tuple[str, str]:
|
|
"""Create CPU display string and status"""
|
|
if pod.cpu_requests == 0 and pod.cpu_limits == 0:
|
|
return "No CPU resources defined", "🔴"
|
|
|
|
# Format CPU values
|
|
cpu_req_str = self._format_cpu_value(pod.cpu_requests)
|
|
cpu_lim_str = self._format_cpu_value(pod.cpu_limits)
|
|
|
|
# Calculate ratio
|
|
if pod.cpu_requests > 0:
|
|
ratio = pod.cpu_limits / pod.cpu_requests
|
|
ratio_str = f"({ratio:.1f}:1 ratio)"
|
|
else:
|
|
ratio_str = "(no requests)"
|
|
|
|
display = f"{cpu_req_str} → {cpu_lim_str} {ratio_str}"
|
|
|
|
# Determine status
|
|
if pod.cpu_requests == 0:
|
|
status = "🔴" # No requests
|
|
elif pod.cpu_limits == 0:
|
|
status = "🟡" # No limits
|
|
elif pod.cpu_requests > 0 and pod.cpu_limits > 0:
|
|
ratio = pod.cpu_limits / pod.cpu_requests
|
|
if ratio > 5:
|
|
status = "🔴" # Very high ratio
|
|
elif ratio > 3:
|
|
status = "🟡" # High ratio
|
|
else:
|
|
status = "✅" # Good ratio
|
|
else:
|
|
status = "🔴"
|
|
|
|
return display, status
|
|
|
|
def _create_memory_display(self, pod: PodResource) -> tuple[str, str]:
|
|
"""Create memory display string and status"""
|
|
if pod.memory_requests == 0 and pod.memory_limits == 0:
|
|
return "No memory resources defined", "🔴"
|
|
|
|
# Format memory values
|
|
mem_req_str = self._format_memory_value(pod.memory_requests)
|
|
mem_lim_str = self._format_memory_value(pod.memory_limits)
|
|
|
|
# Calculate ratio
|
|
if pod.memory_requests > 0:
|
|
ratio = pod.memory_limits / pod.memory_requests
|
|
ratio_str = f"({ratio:.1f}:1 ratio)"
|
|
else:
|
|
ratio_str = "(no requests)"
|
|
|
|
display = f"{mem_req_str} → {mem_lim_str} {ratio_str}"
|
|
|
|
# Determine status
|
|
if pod.memory_requests == 0:
|
|
status = "🔴" # No requests
|
|
elif pod.memory_limits == 0:
|
|
status = "🟡" # No limits
|
|
elif pod.memory_requests > 0 and pod.memory_limits > 0:
|
|
ratio = pod.memory_limits / pod.memory_requests
|
|
if ratio > 5:
|
|
status = "🔴" # Very high ratio
|
|
elif ratio > 3:
|
|
status = "🟡" # High ratio
|
|
else:
|
|
status = "✅" # Good ratio
|
|
else:
|
|
status = "🔴"
|
|
|
|
return display, status
|
|
|
|
def _format_cpu_value(self, value: float) -> str:
|
|
"""Format CPU value for display"""
|
|
if value >= 1.0:
|
|
return f"{value:.1f} cores"
|
|
else:
|
|
return f"{int(value * 1000)}m"
|
|
|
|
def _format_memory_value(self, value_bytes: float) -> str:
|
|
"""Format memory value for display"""
|
|
if value_bytes >= 1024 * 1024 * 1024: # >= 1 GiB
|
|
return f"{value_bytes / (1024 * 1024 * 1024):.1f} GiB"
|
|
else:
|
|
return f"{int(value_bytes / (1024 * 1024))} MiB"
|
|
|
|
def _determine_available_actions(self, validations: List[ResourceValidation]) -> List[str]:
|
|
"""Determine available actions based on validations"""
|
|
actions = []
|
|
|
|
for validation in validations:
|
|
if validation.validation_type == "missing_requests":
|
|
actions.append("add_requests")
|
|
elif validation.validation_type == "missing_limits":
|
|
actions.append("add_limits")
|
|
elif validation.validation_type == "cpu_ratio":
|
|
actions.append("fix_cpu_ratio")
|
|
elif validation.validation_type == "memory_ratio":
|
|
actions.append("fix_memory_ratio")
|
|
|
|
return list(set(actions)) # Remove duplicates
|
|
|
|
def _generate_oc_commands(self, pod: PodResource, validations: List[ResourceValidation]) -> List[str]:
|
|
"""Generate oc commands for fixing issues"""
|
|
commands = []
|
|
|
|
# Generate commands for each validation
|
|
for validation in validations:
|
|
if validation.validation_type == "missing_requests":
|
|
cmd = self._generate_add_requests_command(pod, validation)
|
|
if cmd:
|
|
commands.append(cmd)
|
|
elif validation.validation_type == "missing_limits":
|
|
cmd = self._generate_add_limits_command(pod, validation)
|
|
if cmd:
|
|
commands.append(cmd)
|
|
elif validation.validation_type in ["cpu_ratio", "memory_ratio"]:
|
|
cmd = self._generate_fix_ratio_command(pod, validation)
|
|
if cmd:
|
|
commands.append(cmd)
|
|
|
|
return commands
|
|
|
|
def _generate_add_requests_command(self, pod: PodResource, validation: ResourceValidation) -> str:
|
|
"""Generate oc command to add requests"""
|
|
# This would need to be implemented based on specific container
|
|
return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"requests\":{{\"cpu\":\"100m\",\"memory\":\"128Mi\"}}}}}}]}}}}'"
|
|
|
|
def _generate_add_limits_command(self, pod: PodResource, validation: ResourceValidation) -> str:
|
|
"""Generate oc command to add limits"""
|
|
return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"cpu\":\"500m\",\"memory\":\"512Mi\"}}}}}}]}}}}'"
|
|
|
|
def _generate_fix_ratio_command(self, pod: PodResource, validation: ResourceValidation) -> str:
|
|
"""Generate oc command to fix ratio"""
|
|
# Calculate recommended limits based on 3:1 ratio
|
|
if validation.validation_type == "cpu_ratio":
|
|
recommended_limit = pod.cpu_requests * 3
|
|
limit_str = self._format_cpu_value(recommended_limit)
|
|
return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"cpu\":\"{limit_str}\"}}}}}}]}}}}'"
|
|
elif validation.validation_type == "memory_ratio":
|
|
recommended_limit = pod.memory_requests * 3
|
|
limit_str = self._format_memory_value(recommended_limit)
|
|
return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"memory\":\"{limit_str}\"}}}}}}]}}}}'"
|
|
|
|
return ""
|