diff --git a/app/api/routes.py b/app/api/routes.py index b8cbd20..f7755a9 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -45,22 +45,39 @@ async def get_cluster_status( pods = await k8s_client.get_all_pods() nodes_info = await k8s_client.get_nodes_info() - # Validate resources with historical analysis (includes static validations) + # Validate resources with historical analysis by workload (more reliable) all_validations = [] + # Group pods by namespace for workload analysis + namespace_pods = {} for pod in pods: - # Historical analysis includes static validations + if pod.namespace not in namespace_pods: + namespace_pods[pod.namespace] = [] + namespace_pods[pod.namespace].append(pod) + + # Analyze each namespace's workloads + for namespace, namespace_pod_list in namespace_pods.items(): try: - historical_validations = await validation_service.validate_pod_resources_with_historical_analysis(pod, "24h") - all_validations.extend(historical_validations) + # Use workload-based analysis (more reliable than individual pods) + workload_validations = await validation_service.validate_workload_resources_with_historical_analysis( + namespace_pod_list, "24h" + ) + all_validations.extend(workload_validations) except Exception as e: - logger.warning(f"Error in historical analysis for pod {pod.name}: {e}") - # Fallback to static validations only if historical analysis fails - try: - static_validations = validation_service.validate_pod_resources(pod) - all_validations.extend(static_validations) - except Exception as static_e: - logger.error(f"Error in static validation for pod {pod.name}: {static_e}") + logger.warning(f"Error in workload analysis for namespace {namespace}: {e}") + # Fallback to individual pod analysis + for pod in namespace_pod_list: + try: + pod_validations = await validation_service.validate_pod_resources_with_historical_analysis(pod, "24h") + all_validations.extend(pod_validations) + except Exception as pod_e: + logger.warning(f"Error in historical analysis for pod {pod.name}: {pod_e}") + # Final fallback to static validations only + try: + static_validations = validation_service.validate_pod_resources(pod) + all_validations.extend(static_validations) + except Exception as static_e: + logger.error(f"Error in static validation for pod {pod.name}: {static_e}") # Get overcommit information overcommit_info = await prometheus_client.get_cluster_overcommit() diff --git a/app/services/historical_analysis.py b/app/services/historical_analysis.py index 18973d7..c84f32e 100644 --- a/app/services/historical_analysis.py +++ b/app/services/historical_analysis.py @@ -36,6 +36,223 @@ class HistoricalAnalysisService: except (ValueError, TypeError): return default + def _extract_workload_name(self, pod_name: str) -> str: + """Extract workload name from pod name (remove pod suffix)""" + # Pod names typically follow pattern: workload-name-hash-suffix + # e.g., resource-governance-798b5579d6-7h298 -> resource-governance + parts = pod_name.split('-') + if len(parts) >= 3 and parts[-1].isalnum() and len(parts[-1]) == 5: + # Remove the last two parts (hash and suffix) + return '-'.join(parts[:-2]) + elif len(parts) >= 2 and parts[-1].isalnum() and len(parts[-1]) == 5: + # Remove the last part (suffix) + return '-'.join(parts[:-1]) + else: + # Fallback to pod name if pattern doesn't match + return pod_name + + async def analyze_workload_historical_usage( + self, + pods: List[PodResource], + time_range: str = '24h' + ) -> List[ResourceValidation]: + """Analyze historical usage for a workload (group of pods)""" + if not pods: + return [] + + # Group pods by workload name + workload_pods = {} + for pod in pods: + workload_name = self._extract_workload_name(pod.name) + if workload_name not in workload_pods: + workload_pods[workload_name] = [] + workload_pods[workload_name].append(pod) + + all_validations = [] + + # Analyze each workload + for workload_name, workload_pod_list in workload_pods.items(): + try: + # Use the first pod as representative for the workload + representative_pod = workload_pod_list[0] + + # Analyze historical usage for the workload + workload_validations = await self._analyze_workload_metrics( + workload_name, representative_pod.namespace, workload_pod_list, time_range + ) + all_validations.extend(workload_validations) + + except Exception as e: + logger.warning(f"Error analyzing workload {workload_name}: {e}") + # Fallback to individual pod analysis + for pod in workload_pod_list: + try: + pod_validations = await self.analyze_pod_historical_usage(pod, time_range) + all_validations.extend(pod_validations) + except Exception as pod_e: + logger.warning(f"Error analyzing pod {pod.name}: {pod_e}") + + return all_validations + + async def _analyze_workload_metrics( + self, + workload_name: str, + namespace: str, + pods: List[PodResource], + time_range: str + ) -> List[ResourceValidation]: + """Analyze metrics for a workload using Prometheus queries""" + validations = [] + + try: + # Query for CPU usage by workload + cpu_query = f''' + sum( + node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{ + cluster="", + namespace="{namespace}" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Query for memory usage by workload + memory_query = f''' + sum( + container_memory_working_set_bytes{{ + job="kubelet", + metrics_path="/metrics/cadvisor", + cluster="", + namespace="{namespace}", + container!="", + image!="" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Query for CPU requests by workload + cpu_requests_query = f''' + sum( + kube_pod_container_resource_requests{{ + resource="cpu", + namespace="{namespace}" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Query for memory requests by workload + memory_requests_query = f''' + sum( + kube_pod_container_resource_requests{{ + resource="memory", + namespace="{namespace}" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Query for CPU limits by workload + cpu_limits_query = f''' + sum( + kube_pod_container_resource_limits{{ + resource="cpu", + namespace="{namespace}" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Query for memory limits by workload + memory_limits_query = f''' + sum( + kube_pod_container_resource_limits{{ + resource="memory", + namespace="{namespace}" + }} + * on(namespace,pod) + group_left(workload, workload_type) + namespace_workload_pod:kube_pod_owner:relabel{{ + cluster="", + namespace="{namespace}", + workload="{workload_name}", + workload_type=~".+" + }} + ) by (workload, workload_type) + ''' + + # Execute queries + cpu_usage_data = await self._query_prometheus(cpu_query, time_range) + memory_usage_data = await self._query_prometheus(memory_query, time_range) + cpu_requests_data = await self._query_prometheus(cpu_requests_query, time_range) + memory_requests_data = await self._query_prometheus(memory_requests_query, time_range) + cpu_limits_data = await self._query_prometheus(cpu_limits_query, time_range) + memory_limits_data = await self._query_prometheus(memory_limits_query, time_range) + + # Analyze CPU metrics for workload + if cpu_usage_data and cpu_requests_data and cpu_limits_data: + cpu_validations = self._analyze_cpu_metrics( + workload_name, namespace, "workload", + cpu_usage_data, cpu_requests_data, cpu_limits_data, time_range + ) + validations.extend(cpu_validations) + + # Analyze memory metrics for workload + if memory_usage_data and memory_requests_data and memory_limits_data: + memory_validations = self._analyze_memory_metrics( + workload_name, namespace, "workload", + memory_usage_data, memory_requests_data, memory_limits_data, time_range + ) + validations.extend(memory_validations) + + except Exception as e: + logger.warning(f"Error analyzing workload metrics for {workload_name}: {e}") + # Fallback to individual pod analysis + for pod in pods: + try: + pod_validations = await self.analyze_pod_historical_usage(pod, time_range) + validations.extend(pod_validations) + except Exception as pod_e: + logger.warning(f"Error analyzing pod {pod.name}: {pod_e}") + + return validations + async def analyze_pod_historical_usage( self, pod: PodResource, @@ -319,6 +536,7 @@ class HistoricalAnalysisService: message=f"Limited CPU usage data ({len(usage_values)} points) for {time_range}", recommendation="Wait for more data points or extend time range for reliable analysis" )) + return validations # Don't proceed with historical analysis if insufficient data # Current values of requests/limits current_requests = self._safe_float(requests_data[0][1]) if requests_data else 0 @@ -442,6 +660,7 @@ class HistoricalAnalysisService: message=f"Limited memory usage data ({len(usage_values)} points) for {time_range}", recommendation="Wait for more data points or extend time range for reliable analysis" )) + return validations # Don't proceed with historical analysis if insufficient data # Current values of requests/limits (in bytes) current_requests = self._safe_float(requests_data[0][1]) if requests_data else 0 diff --git a/app/services/validation_service.py b/app/services/validation_service.py index 9bd702f..2164b3e 100644 --- a/app/services/validation_service.py +++ b/app/services/validation_service.py @@ -64,6 +64,39 @@ class ValidationService: logger.warning(f"Error in historical analysis for pod {pod.name}: {e}") return static_validations + + async def validate_workload_resources_with_historical_analysis( + self, + pods: List[PodResource], + time_range: str = '24h' + ) -> List[ResourceValidation]: + """Validate workload resources including historical analysis (recommended approach)""" + all_validations = [] + + # Static validations for all pods + for pod in pods: + static_validations = self.validate_pod_resources(pod) + all_validations.extend(static_validations) + + # Historical analysis by workload (more reliable than individual pods) + try: + historical_validations = await self.historical_analysis.analyze_workload_historical_usage( + pods, time_range + ) + all_validations.extend(historical_validations) + except Exception as e: + logger.warning(f"Error in workload historical analysis: {e}") + # Fallback to individual pod analysis + for pod in pods: + try: + pod_historical = await self.historical_analysis.analyze_pod_historical_usage( + pod, time_range + ) + all_validations.extend(pod_historical) + except Exception as pod_e: + logger.warning(f"Error in historical analysis for pod {pod.name}: {pod_e}") + + return all_validations def _validate_container_resources( self,