""" Resource validation service following Red Hat best practices """ import logging from typing import List, Dict, Any, Optional from decimal import Decimal, InvalidOperation import re from app.models.resource_models import ( PodResource, ResourceValidation, NamespaceResources, QoSClassification, ResourceQuota, ClusterHealth, PodHealthScore, SimplifiedValidation ) from app.core.config import settings from app.services.historical_analysis import HistoricalAnalysisService from app.services.smart_recommendations import SmartRecommendationsService logger = logging.getLogger(__name__) class ValidationService: """Service for resource validation""" def __init__(self): self.cpu_ratio = settings.cpu_limit_ratio self.memory_ratio = settings.memory_limit_ratio self.min_cpu_request = settings.min_cpu_request self.min_memory_request = settings.min_memory_request self.historical_analysis = HistoricalAnalysisService() self.smart_recommendations = SmartRecommendationsService() def validate_pod_resources(self, pod: PodResource) -> List[ResourceValidation]: """Validate pod resources""" validations = [] for container in pod.containers: container_validations = self._validate_container_resources( pod.name, pod.namespace, container ) validations.extend(container_validations) return validations async def validate_pod_resources_with_historical_analysis( self, pod: PodResource, time_range: str = '24h' ) -> List[ResourceValidation]: """Validate pod resources including historical analysis""" # Static validations static_validations = self.validate_pod_resources(pod) # Historical analysis try: historical_validations = await self.historical_analysis.analyze_pod_historical_usage( pod, time_range ) static_validations.extend(historical_validations) except Exception as e: logger.warning(f"Error in historical analysis for pod {pod.name}: {e}") return static_validations def _validate_container_resources( self, pod_name: str, namespace: str, container: Any ) -> List[ResourceValidation]: """Validate container resources""" validations = [] resources = container["resources"] requests = resources.get("requests", {}) limits = resources.get("limits", {}) # Determine QoS class based on Red Hat best practices qos_class = self._determine_qos_class(requests, limits) # 1. Check if requests are defined if not requests: validations.append(ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container["name"], validation_type="missing_requests", severity="error", message="Container without defined requests", recommendation="Define CPU and memory requests to guarantee QoS (currently BestEffort class)" )) # 2. Check if limits are defined if not limits: validations.append(ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container["name"], validation_type="missing_limits", severity="warning", message="Container without defined limits", recommendation="Define limits to avoid excessive resource consumption" )) # 3. QoS Class validation based on Red Hat recommendations qos_validation = self._validate_qos_class(pod_name, namespace, container["name"], qos_class, requests, limits) if qos_validation: validations.append(qos_validation) # 3. Validate limit:request ratio if requests and limits: cpu_validation = self._validate_cpu_ratio( pod_name, namespace, container["name"], requests, limits ) if cpu_validation: validations.append(cpu_validation) memory_validation = self._validate_memory_ratio( pod_name, namespace, container["name"], requests, limits ) if memory_validation: validations.append(memory_validation) # 4. Add container resource metrics validation if requests or limits: metrics_validation = self._validate_container_metrics( pod_name, namespace, container["name"], requests, limits ) if metrics_validation: validations.append(metrics_validation) # 5. Validate minimum values if requests: min_validation = self._validate_minimum_values( pod_name, namespace, container["name"], requests ) validations.extend(min_validation) return validations def _validate_cpu_ratio( self, pod_name: str, namespace: str, container_name: str, requests: Dict[str, str], limits: Dict[str, str] ) -> ResourceValidation: """Validate CPU limit:request ratio""" if "cpu" not in requests or "cpu" not in limits: return None try: request_value = self._parse_cpu_value(requests["cpu"]) limit_value = self._parse_cpu_value(limits["cpu"]) if request_value > 0: ratio = limit_value / request_value if ratio > self.cpu_ratio: # Sem tolerância excessiva return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="invalid_ratio", severity="warning", message=f"CPU limit:request ratio too high ({ratio:.2f}:1) - Request: {requests['cpu']}, Limit: {limits['cpu']}", recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.cpu_ratio}:1)" ) elif ratio < 1.0: return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="invalid_ratio", severity="error", message=f"CPU limit less than request ({ratio:.2f}:1) - Request: {requests['cpu']}, Limit: {limits['cpu']}", recommendation="CPU limit should be greater than or equal to request" ) except (ValueError, InvalidOperation) as e: logger.warning(f"Error validating CPU ratio: {e}") return None def _validate_memory_ratio( self, pod_name: str, namespace: str, container_name: str, requests: Dict[str, str], limits: Dict[str, str] ) -> ResourceValidation: """Validate memory limit:request ratio""" if "memory" not in requests or "memory" not in limits: return None try: request_value = self._parse_memory_value(requests["memory"]) limit_value = self._parse_memory_value(limits["memory"]) if request_value > 0: ratio = limit_value / request_value if ratio > self.memory_ratio: # Sem tolerância excessiva return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="invalid_ratio", severity="warning", message=f"Memory limit:request ratio too high ({ratio:.2f}:1) - Request: {requests['memory']}, Limit: {limits['memory']}", recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.memory_ratio}:1)" ) elif ratio < 1.0: return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="invalid_ratio", severity="error", message=f"Memory limit less than request ({ratio:.2f}:1) - Request: {requests['memory']}, Limit: {limits['memory']}", recommendation="Memory limit should be greater than or equal to request" ) except (ValueError, InvalidOperation) as e: logger.warning(f"Error validating memory ratio: {e}") return None def _validate_container_metrics( self, pod_name: str, namespace: str, container_name: str, requests: Dict[str, str], limits: Dict[str, str] ) -> ResourceValidation: """Show container resource metrics and analysis""" try: # Parse CPU values cpu_request = requests.get("cpu", "0") cpu_limit = limits.get("cpu", "0") cpu_request_parsed = self._parse_cpu_value(cpu_request) cpu_limit_parsed = self._parse_cpu_value(cpu_limit) # Parse Memory values memory_request = requests.get("memory", "0") memory_limit = limits.get("memory", "0") memory_request_parsed = self._parse_memory_value(memory_request) memory_limit_parsed = self._parse_memory_value(memory_limit) # Calculate ratios cpu_ratio = cpu_limit_parsed / cpu_request_parsed if cpu_request_parsed > 0 else 0 memory_ratio = memory_limit_parsed / memory_request_parsed if memory_request_parsed > 0 else 0 # Format values for display cpu_request_display = f"{cpu_request_parsed:.1f} cores" if cpu_request_parsed >= 1.0 else f"{cpu_request_parsed * 1000:.0f}m" cpu_limit_display = f"{cpu_limit_parsed:.1f} cores" if cpu_limit_parsed >= 1.0 else f"{cpu_limit_parsed * 1000:.0f}m" memory_request_display = f"{memory_request_parsed / (1024*1024*1024):.1f} GiB" if memory_request_parsed >= 1024*1024*1024 else f"{memory_request_parsed / (1024*1024):.0f} MiB" memory_limit_display = f"{memory_limit_parsed / (1024*1024*1024):.1f} GiB" if memory_limit_parsed >= 1024*1024*1024 else f"{memory_limit_parsed / (1024*1024):.0f} MiB" # Create detailed message message = f"Container Resources - CPU: {cpu_request_display}→{cpu_limit_display} (ratio: {cpu_ratio:.1f}:1), Memory: {memory_request_display}→{memory_limit_display} (ratio: {memory_ratio:.1f}:1)" # Create recommendation based on ratios recommendations = [] if cpu_ratio > self.cpu_ratio: recommendations.append(f"CPU ratio {cpu_ratio:.1f}:1 exceeds recommended {self.cpu_ratio}:1") if memory_ratio > self.memory_ratio: recommendations.append(f"Memory ratio {memory_ratio:.1f}:1 exceeds recommended {self.memory_ratio}:1") recommendation = "; ".join(recommendations) if recommendations else f"Resource allocation within recommended ratios (CPU: {self.cpu_ratio}:1, Memory: {self.memory_ratio}:1)" return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="container_metrics", severity="info", message=message, recommendation=recommendation ) except Exception as e: logger.warning(f"Error validating container metrics: {e}") return None def _validate_minimum_values( self, pod_name: str, namespace: str, container_name: str, requests: Dict[str, str] ) -> List[ResourceValidation]: """Validate minimum request values""" validations = [] # Validate minimum CPU if "cpu" in requests: try: request_value = self._parse_cpu_value(requests["cpu"]) min_value = self._parse_cpu_value(self.min_cpu_request) if request_value < min_value: validations.append(ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="minimum_value", severity="warning", message=f"CPU request too low ({requests['cpu']})", recommendation=f"Consider increasing to at least {self.min_cpu_request}" )) except (ValueError, InvalidOperation): pass # Validate minimum memory if "memory" in requests: try: request_value = self._parse_memory_value(requests["memory"]) min_value = self._parse_memory_value(self.min_memory_request) if request_value < min_value: validations.append(ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="minimum_value", severity="warning", message=f"Memory request too low ({requests['memory']})", recommendation=f"Consider increasing to at least {self.min_memory_request}" )) except (ValueError, InvalidOperation): pass return validations def _parse_cpu_value(self, value: str) -> float: """Convert CPU value to float (cores)""" if value.endswith('m'): return float(value[:-1]) / 1000 elif value.endswith('n'): return float(value[:-1]) / 1000000000 else: return float(value) def _parse_memory_value(self, value: str) -> int: """Convert memory value to bytes""" value = value.upper() if value.endswith('KI'): return int(float(value[:-2]) * 1024) elif value.endswith('MI'): return int(float(value[:-2]) * 1024 * 1024) elif value.endswith('GI'): return int(float(value[:-2]) * 1024 * 1024 * 1024) elif value.endswith('K'): return int(float(value[:-1]) * 1000) elif value.endswith('M'): return int(float(value[:-1]) * 1000 * 1000) elif value.endswith('G'): return int(float(value[:-1]) * 1000 * 1000 * 1000) else: return int(value) def _determine_qos_class(self, requests: Dict[str, str], limits: Dict[str, str]) -> str: """Determine QoS class based on requests and limits""" cpu_requests = self._parse_cpu_value(requests.get("cpu", "0")) memory_requests = self._parse_memory_value(requests.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB cpu_limits = self._parse_cpu_value(limits.get("cpu", "0")) memory_limits = self._parse_memory_value(limits.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB # Guaranteed: both CPU and memory requests and limits are set and equal if (cpu_requests > 0 and memory_requests > 0 and cpu_requests == cpu_limits and memory_requests == memory_limits): return "Guaranteed" # Burstable: at least one request is set elif cpu_requests > 0 or memory_requests > 0: return "Burstable" # BestEffort: no requests set else: return "BestEffort" def _validate_qos_class(self, pod_name: str, namespace: str, container_name: str, qos_class: str, requests: Dict[str, str], limits: Dict[str, str]) -> Optional[ResourceValidation]: """Validate QoS class and provide recommendations""" cpu_requests = self._parse_cpu_value(requests.get("cpu", "0")) memory_requests = self._parse_memory_value(requests.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB cpu_limits = self._parse_cpu_value(limits.get("cpu", "0")) memory_limits = self._parse_memory_value(limits.get("memory", "0")) / (1024 * 1024 * 1024) # Convert to GB # Check for missing requests (BestEffort pods) - removed duplicate validation # This is already handled at container level in _validate_container_resources # Check for missing limits (Burstable pods) if qos_class == "Burstable" and (cpu_limits == 0 or memory_limits == 0): return ResourceValidation( pod_name=pod_name, namespace=namespace, container_name=container_name, validation_type="missing_limits", severity="warning", message="Pod has requests but no limits defined", recommendation="Define resource limits to prevent resource starvation", priority_score=5, workload_category="established", estimated_impact="low" ) return None def validate_namespace_overcommit( self, namespace_resources: NamespaceResources, node_capacity: Dict[str, str] ) -> List[ResourceValidation]: """Validate overcommit in a namespace""" validations = [] # Calculate total namespace requests total_cpu_requests = self._parse_cpu_value(namespace_resources.total_cpu_requests) total_memory_requests = self._parse_memory_value(namespace_resources.total_memory_requests) # Calculate total node capacity total_cpu_capacity = self._parse_cpu_value(node_capacity.get("cpu", "0")) total_memory_capacity = self._parse_memory_value(node_capacity.get("memory", "0")) # Check CPU overcommit if total_cpu_capacity > 0: cpu_utilization = (total_cpu_requests / total_cpu_capacity) * 100 if cpu_utilization > 100: validations.append(ResourceValidation( pod_name="namespace", namespace=namespace_resources.name, container_name="all", validation_type="overcommit", severity="critical", message=f"CPU overcommit in namespace: {cpu_utilization:.1f}%", recommendation="Reduce CPU requests or add more nodes to the cluster" )) # Check memory overcommit if total_memory_capacity > 0: memory_utilization = (total_memory_requests / total_memory_capacity) * 100 if memory_utilization > 100: validations.append(ResourceValidation( pod_name="namespace", namespace=namespace_resources.name, container_name="all", validation_type="overcommit", severity="critical", message=f"Memory overcommit in namespace: {memory_utilization:.1f}%", recommendation="Reduce memory requests or add more nodes to the cluster" )) return validations def generate_recommendations(self, validations: List[ResourceValidation]) -> List[str]: """Generate recommendations based on validations""" recommendations = [] # Group validations by type validation_counts = {} for validation in validations: validation_type = validation.validation_type if validation_type not in validation_counts: validation_counts[validation_type] = 0 validation_counts[validation_type] += 1 # Generate recommendations based on found issues if validation_counts.get("missing_requests", 0) > 0: recommendations.append( f"Implement LimitRange in namespace to define default requests " f"({validation_counts['missing_requests']} containers without requests)" ) if validation_counts.get("missing_limits", 0) > 0: recommendations.append( f"Define limits for {validation_counts['missing_limits']} containers " "to avoid excessive resource consumption" ) if validation_counts.get("invalid_ratio", 0) > 0: recommendations.append( f"Adjust limit:request ratio for {validation_counts['invalid_ratio']} containers " f"(recommended: {self.cpu_ratio}:1)" ) if validation_counts.get("overcommit", 0) > 0: recommendations.append( f"Resolve overcommit in {validation_counts['overcommit']} namespaces " "to avoid performance issues" ) return recommendations async def validate_pod_resources_with_categorization( self, pod: PodResource, workload_category: str = None, priority_score: int = None ) -> List[ResourceValidation]: """Validate pod resources with enhanced categorization and scoring""" validations = self.validate_pod_resources(pod) # Add categorization and scoring to validations for validation in validations: validation.workload_category = workload_category validation.priority_score = priority_score or self._calculate_priority_score(validation) validation.estimated_impact = self._determine_impact(validation.priority_score) return validations async def validate_pod_resources_with_smart_analysis( self, pod: PodResource, time_range: str = '24h' ) -> List[ResourceValidation]: """Validate pod resources with smart analysis including historical data""" # Static validations static_validations = self.validate_pod_resources(pod) # Get workload category workload_category = await self._categorize_workload(pod) # Get smart recommendations smart_recommendations = await self.smart_recommendations.generate_smart_recommendations([pod], [workload_category]) # Enhance validations with smart analysis enhanced_validations = [] for validation in static_validations: validation.workload_category = workload_category.category validation.priority_score = self._calculate_priority_score(validation) validation.estimated_impact = self._determine_impact(validation.priority_score) enhanced_validations.append(validation) # Add smart recommendations as validations for recommendation in smart_recommendations: smart_validation = ResourceValidation( pod_name=pod.name, namespace=pod.namespace, container_name="workload", validation_type="smart_recommendation", severity=recommendation.priority, message=recommendation.title, recommendation=recommendation.description, priority_score=self._get_priority_score_from_string(recommendation.priority), workload_category=workload_category.category, estimated_impact=recommendation.estimated_impact ) enhanced_validations.append(smart_validation) return enhanced_validations async def _categorize_workload(self, pod: PodResource) -> Any: """Categorize a single workload""" categories = await self.smart_recommendations.categorize_workloads([pod]) return categories[0] if categories else None def _get_priority_score_from_string(self, priority: str) -> int: """Convert priority string to numeric score""" priority_map = { "critical": 10, "high": 8, "medium": 5, "low": 2 } return priority_map.get(priority, 5) def _calculate_priority_score(self, validation: ResourceValidation) -> int: """Calculate priority score for validation (1-10)""" score = 1 # Base score by severity if validation.severity == "critical": score += 4 elif validation.severity == "error": score += 3 elif validation.severity == "warning": score += 1 # Add score by validation type if validation.validation_type == "missing_requests": score += 3 elif validation.validation_type == "missing_limits": score += 2 elif validation.validation_type == "invalid_ratio": score += 1 elif validation.validation_type == "overcommit": score += 4 # Add score for production namespaces if validation.namespace in ["default", "production", "prod"]: score += 2 return min(score, 10) def _determine_impact(self, priority_score: int) -> str: """Determine estimated impact based on priority score""" if priority_score >= 8: return "critical" elif priority_score >= 6: return "high" elif priority_score >= 4: return "medium" else: return "low" async def get_workload_categories(self, pods: List[PodResource]) -> List[Any]: """Get workload categories for all pods""" return await self.smart_recommendations.categorize_workloads(pods) async def get_smart_recommendations(self, pods: List[PodResource]) -> List[Any]: """Get smart recommendations for all workloads""" categories = await self.get_workload_categories(pods) return await self.smart_recommendations.generate_smart_recommendations(pods, categories) def classify_qos(self, pod: PodResource) -> QoSClassification: """Classify pod QoS based on Red Hat best practices""" cpu_requests = pod.cpu_requests memory_requests = pod.memory_requests cpu_limits = pod.cpu_limits memory_limits = pod.memory_limits # Determine QoS class if (cpu_requests > 0 and memory_requests > 0 and cpu_limits > 0 and memory_limits > 0 and cpu_requests == cpu_limits and memory_requests == memory_limits): qos_class = "Guaranteed" efficiency_score = 1.0 elif (cpu_requests > 0 or memory_requests > 0): qos_class = "Burstable" # Calculate efficiency based on request/limit ratio cpu_efficiency = cpu_requests / cpu_limits if cpu_limits > 0 else 0.5 memory_efficiency = memory_requests / memory_limits if memory_limits > 0 else 0.5 efficiency_score = (cpu_efficiency + memory_efficiency) / 2 else: qos_class = "BestEffort" efficiency_score = 0.0 # Generate recommendation recommendation = None if qos_class == "BestEffort": recommendation = "Define CPU and memory requests for better resource management" elif qos_class == "Burstable" and efficiency_score < 0.3: recommendation = "Consider setting limits closer to requests for better predictability" elif qos_class == "Guaranteed": recommendation = "Optimal QoS configuration for production workloads" return QoSClassification( pod_name=pod.name, namespace=pod.namespace, qos_class=qos_class, cpu_requests=cpu_requests, memory_requests=memory_requests, cpu_limits=cpu_limits, memory_limits=memory_limits, efficiency_score=efficiency_score, recommendation=recommendation ) async def analyze_resource_quotas(self, namespaces: List[str]) -> List[ResourceQuota]: """Analyze Resource Quotas for namespaces""" quotas = [] for namespace in namespaces: # This would typically query the Kubernetes API # For now, we'll simulate the analysis quota = ResourceQuota( namespace=namespace, name=f"quota-{namespace}", status="Missing", # Would be determined by API call usage_percentage=0.0, recommended_quota={ "cpu": "2000m", "memory": "8Gi", "pods": "20" } ) quotas.append(quota) return quotas async def _get_cluster_capacity(self) -> tuple[float, float, int]: """Get real cluster capacity from nodes""" try: from kubernetes import client v1 = client.CoreV1Api() nodes = v1.list_node() total_cpu_cores = 0.0 total_memory_bytes = 0.0 total_nodes = len(nodes.items) for node in nodes.items: # Parse CPU capacity cpu_capacity = node.status.capacity.get("cpu", "0") total_cpu_cores += self._parse_cpu_value(cpu_capacity) # Parse Memory capacity memory_capacity = node.status.capacity.get("memory", "0") total_memory_bytes += self._parse_memory_value(memory_capacity) # Convert memory to GiB total_memory_gib = total_memory_bytes / (1024 * 1024 * 1024) return total_cpu_cores, total_memory_gib, total_nodes except Exception as e: logger.warning(f"Could not get real cluster capacity: {e}. Using fallback values.") # Fallback values based on typical OpenShift cluster return 24.0, 70.0, 6 async def get_cluster_health(self, pods: List[PodResource]) -> ClusterHealth: """Get cluster health overview with overcommit analysis""" total_pods = len(pods) total_namespaces = len(set(pod.namespace for pod in pods)) # Calculate cluster resource totals cluster_cpu_requests = sum(pod.cpu_requests for pod in pods) cluster_memory_requests = sum(pod.memory_requests for pod in pods) cluster_cpu_limits = sum(pod.cpu_limits for pod in pods) cluster_memory_limits = sum(pod.memory_limits for pod in pods) # Get real cluster capacity cluster_cpu_capacity, cluster_memory_capacity, total_nodes = await self._get_cluster_capacity() # Calculate overcommit percentages cpu_overcommit = (cluster_cpu_requests / cluster_cpu_capacity) * 100 # Convert memory capacity from GiB to bytes for consistent calculation cluster_memory_capacity_bytes = cluster_memory_capacity * (1024 * 1024 * 1024) memory_overcommit = (cluster_memory_requests / cluster_memory_capacity_bytes) * 100 # Determine overall health if cpu_overcommit > 150 or memory_overcommit > 150: overall_health = "Critical" elif cpu_overcommit > 120 or memory_overcommit > 120: overall_health = "Warning" else: overall_health = "Healthy" # Count critical issues critical_issues = sum(1 for pod in pods if pod.cpu_requests == 0 or pod.memory_requests == 0) # Get top resource consumers top_consumers = sorted( pods, key=lambda p: p.cpu_requests + p.memory_requests, reverse=True )[:10] # QoS distribution qos_distribution = {"Guaranteed": 0, "Burstable": 0, "BestEffort": 0} for pod in pods: qos = self.classify_qos(pod) qos_distribution[qos.qos_class] += 1 return ClusterHealth( total_pods=total_pods, total_namespaces=total_namespaces, total_nodes=total_nodes, cluster_cpu_capacity=cluster_cpu_capacity, cluster_memory_capacity=cluster_memory_capacity, cluster_cpu_requests=cluster_cpu_requests, cluster_memory_requests=cluster_memory_requests, cluster_cpu_limits=cluster_cpu_limits, cluster_memory_limits=cluster_memory_limits, cpu_overcommit_percentage=cpu_overcommit, memory_overcommit_percentage=memory_overcommit, overall_health=overall_health, critical_issues=critical_issues, namespaces_in_overcommit=len([ns for ns in set(pod.namespace for pod in pods) if self._is_namespace_in_overcommit(ns, pods)]), top_resource_consumers=[ { "name": pod.name, "namespace": pod.namespace, "cpu_requests": pod.cpu_requests, "memory_requests": pod.memory_requests, "qos_class": self.classify_qos(pod).qos_class } for pod in top_consumers ], qos_distribution=qos_distribution, resource_quota_coverage=self._calculate_resource_quota_coverage(pods) ) def _is_namespace_in_overcommit(self, namespace: str, pods: List[PodResource]) -> bool: """Check if namespace is in overcommit""" namespace_pods = [pod for pod in pods if pod.namespace == namespace] if not namespace_pods: return False # Simple overcommit check: if any pod has limits > requests for pod in namespace_pods: if pod.cpu_limits > pod.cpu_requests or pod.memory_limits > pod.memory_requests: return True return False def _calculate_resource_quota_coverage(self, pods: List[PodResource]) -> float: """Calculate resource quota coverage percentage""" namespaces = set(pod.namespace for pod in pods) if not namespaces: return 0.0 # For now, return a simple calculation based on namespace count # In a real implementation, this would check actual ResourceQuota objects return min(len(namespaces) * 0.2, 1.0) # 20% per namespace, max 100% def calculate_pod_health_score(self, pod: PodResource, validations: List[ResourceValidation]) -> PodHealthScore: """Calculate pod health score and create simplified display""" # Calculate health score (0-10) health_score = 10 # Deduct points for issues for validation in validations: if validation.severity == "critical": health_score -= 3 elif validation.severity == "error": health_score -= 2 elif validation.severity == "warning": health_score -= 1 # Ensure score is between 0-10 health_score = max(0, min(10, health_score)) # Determine health status and visual indicators if health_score >= 9: health_status = "Excellent" status_color = "green" status_icon = "✅" elif health_score >= 7: health_status = "Good" status_color = "green" status_icon = "✅" elif health_score >= 5: health_status = "Medium" status_color = "yellow" status_icon = "🟡" elif health_score >= 3: health_status = "Poor" status_color = "orange" status_icon = "🟠" else: health_status = "Critical" status_color = "red" status_icon = "🔴" # Create simplified resource display cpu_display, cpu_status = self._create_cpu_display(pod) memory_display, memory_status = self._create_memory_display(pod) # Group validations by severity critical_issues = [] warnings = [] info_items = [] for validation in validations: if validation.severity == "critical": critical_issues.append(validation.message) elif validation.severity in ["error", "warning"]: warnings.append(validation.message) else: info_items.append(validation.message) # Determine available actions available_actions = self._determine_available_actions(validations) oc_commands = self._generate_oc_commands(pod, validations) return PodHealthScore( pod_name=pod.name, namespace=pod.namespace, health_score=health_score, health_status=health_status, status_color=status_color, status_icon=status_icon, cpu_display=cpu_display, memory_display=memory_display, cpu_status=cpu_status, memory_status=memory_status, critical_issues=critical_issues, warnings=warnings, info_items=info_items, available_actions=available_actions, oc_commands=oc_commands ) def _create_cpu_display(self, pod: PodResource) -> tuple[str, str]: """Create CPU display string and status""" if pod.cpu_requests == 0 and pod.cpu_limits == 0: return "No CPU resources defined", "🔴" # Format CPU values cpu_req_str = self._format_cpu_value(pod.cpu_requests) cpu_lim_str = self._format_cpu_value(pod.cpu_limits) # Calculate ratio if pod.cpu_requests > 0: ratio = pod.cpu_limits / pod.cpu_requests ratio_str = f"({ratio:.1f}:1 ratio)" else: ratio_str = "(no requests)" display = f"{cpu_req_str} → {cpu_lim_str} {ratio_str}" # Determine status if pod.cpu_requests == 0: status = "🔴" # No requests elif pod.cpu_limits == 0: status = "🟡" # No limits elif pod.cpu_requests > 0 and pod.cpu_limits > 0: ratio = pod.cpu_limits / pod.cpu_requests if ratio > 5: status = "🔴" # Very high ratio elif ratio > 3: status = "🟡" # High ratio else: status = "✅" # Good ratio else: status = "🔴" return display, status def _create_memory_display(self, pod: PodResource) -> tuple[str, str]: """Create memory display string and status""" if pod.memory_requests == 0 and pod.memory_limits == 0: return "No memory resources defined", "🔴" # Format memory values mem_req_str = self._format_memory_value(pod.memory_requests) mem_lim_str = self._format_memory_value(pod.memory_limits) # Calculate ratio if pod.memory_requests > 0: ratio = pod.memory_limits / pod.memory_requests ratio_str = f"({ratio:.1f}:1 ratio)" else: ratio_str = "(no requests)" display = f"{mem_req_str} → {mem_lim_str} {ratio_str}" # Determine status if pod.memory_requests == 0: status = "🔴" # No requests elif pod.memory_limits == 0: status = "🟡" # No limits elif pod.memory_requests > 0 and pod.memory_limits > 0: ratio = pod.memory_limits / pod.memory_requests if ratio > 5: status = "🔴" # Very high ratio elif ratio > 3: status = "🟡" # High ratio else: status = "✅" # Good ratio else: status = "🔴" return display, status def _format_cpu_value(self, value: float) -> str: """Format CPU value for display""" if value >= 1.0: return f"{value:.1f} cores" else: return f"{int(value * 1000)}m" def _format_memory_value(self, value_bytes: float) -> str: """Format memory value for display""" if value_bytes >= 1024 * 1024 * 1024: # >= 1 GiB return f"{value_bytes / (1024 * 1024 * 1024):.1f} GiB" else: return f"{int(value_bytes / (1024 * 1024))} MiB" def _determine_available_actions(self, validations: List[ResourceValidation]) -> List[str]: """Determine available actions based on validations""" actions = [] for validation in validations: if validation.validation_type == "missing_requests": actions.append("add_requests") elif validation.validation_type == "missing_limits": actions.append("add_limits") elif validation.validation_type == "cpu_ratio": actions.append("fix_cpu_ratio") elif validation.validation_type == "memory_ratio": actions.append("fix_memory_ratio") return list(set(actions)) # Remove duplicates def _generate_oc_commands(self, pod: PodResource, validations: List[ResourceValidation]) -> List[str]: """Generate oc commands for fixing issues""" commands = [] # Generate commands for each validation for validation in validations: if validation.validation_type == "missing_requests": cmd = self._generate_add_requests_command(pod, validation) if cmd: commands.append(cmd) elif validation.validation_type == "missing_limits": cmd = self._generate_add_limits_command(pod, validation) if cmd: commands.append(cmd) elif validation.validation_type in ["cpu_ratio", "memory_ratio"]: cmd = self._generate_fix_ratio_command(pod, validation) if cmd: commands.append(cmd) return commands def _generate_add_requests_command(self, pod: PodResource, validation: ResourceValidation) -> str: """Generate oc command to add requests""" # This would need to be implemented based on specific container return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"requests\":{{\"cpu\":\"100m\",\"memory\":\"128Mi\"}}}}}}]}}}}'" def _generate_add_limits_command(self, pod: PodResource, validation: ResourceValidation) -> str: """Generate oc command to add limits""" return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"cpu\":\"500m\",\"memory\":\"512Mi\"}}}}}}]}}}}'" def _generate_fix_ratio_command(self, pod: PodResource, validation: ResourceValidation) -> str: """Generate oc command to fix ratio""" # Calculate recommended limits based on 3:1 ratio if validation.validation_type == "cpu_ratio": recommended_limit = pod.cpu_requests * 3 limit_str = self._format_cpu_value(recommended_limit) return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"cpu\":\"{limit_str}\"}}}}}}]}}}}'" elif validation.validation_type == "memory_ratio": recommended_limit = pod.memory_requests * 3 limit_str = self._format_memory_value(recommended_limit) return f"oc patch pod {pod.name} -n {pod.namespace} --type='merge' -p='{{\"spec\":{{\"containers\":[{{\"name\":\"{validation.container_name}\",\"resources\":{{\"limits\":{{\"memory\":\"{limit_str}\"}}}}}}]}}}}'" return ""