Fix historical analysis contradictions and implement workload-based analysis
- Fix insufficient_historical_data vs historical_analysis contradiction - Add return statement when insufficient data to prevent P99 calculation - Implement workload-based historical analysis instead of pod-based - Add _extract_workload_name() to identify workload from pod names - Add analyze_workload_historical_usage() for workload-level analysis - Add _analyze_workload_metrics() with Prometheus workload queries - Add validate_workload_resources_with_historical_analysis() method - Update /cluster/status endpoint to use workload analysis by namespace - Improve reliability by analyzing workloads instead of individual pods - Maintain fallback to pod-level analysis if workload analysis fails
This commit is contained in:
@@ -45,17 +45,34 @@ async def get_cluster_status(
|
||||
pods = await k8s_client.get_all_pods()
|
||||
nodes_info = await k8s_client.get_nodes_info()
|
||||
|
||||
# Validate resources with historical analysis (includes static validations)
|
||||
# Validate resources with historical analysis by workload (more reliable)
|
||||
all_validations = []
|
||||
|
||||
# Group pods by namespace for workload analysis
|
||||
namespace_pods = {}
|
||||
for pod in pods:
|
||||
# Historical analysis includes static validations
|
||||
if pod.namespace not in namespace_pods:
|
||||
namespace_pods[pod.namespace] = []
|
||||
namespace_pods[pod.namespace].append(pod)
|
||||
|
||||
# Analyze each namespace's workloads
|
||||
for namespace, namespace_pod_list in namespace_pods.items():
|
||||
try:
|
||||
historical_validations = await validation_service.validate_pod_resources_with_historical_analysis(pod, "24h")
|
||||
all_validations.extend(historical_validations)
|
||||
# Use workload-based analysis (more reliable than individual pods)
|
||||
workload_validations = await validation_service.validate_workload_resources_with_historical_analysis(
|
||||
namespace_pod_list, "24h"
|
||||
)
|
||||
all_validations.extend(workload_validations)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error in historical analysis for pod {pod.name}: {e}")
|
||||
# Fallback to static validations only if historical analysis fails
|
||||
logger.warning(f"Error in workload analysis for namespace {namespace}: {e}")
|
||||
# Fallback to individual pod analysis
|
||||
for pod in namespace_pod_list:
|
||||
try:
|
||||
pod_validations = await validation_service.validate_pod_resources_with_historical_analysis(pod, "24h")
|
||||
all_validations.extend(pod_validations)
|
||||
except Exception as pod_e:
|
||||
logger.warning(f"Error in historical analysis for pod {pod.name}: {pod_e}")
|
||||
# Final fallback to static validations only
|
||||
try:
|
||||
static_validations = validation_service.validate_pod_resources(pod)
|
||||
all_validations.extend(static_validations)
|
||||
|
||||
@@ -36,6 +36,223 @@ class HistoricalAnalysisService:
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
def _extract_workload_name(self, pod_name: str) -> str:
|
||||
"""Extract workload name from pod name (remove pod suffix)"""
|
||||
# Pod names typically follow pattern: workload-name-hash-suffix
|
||||
# e.g., resource-governance-798b5579d6-7h298 -> resource-governance
|
||||
parts = pod_name.split('-')
|
||||
if len(parts) >= 3 and parts[-1].isalnum() and len(parts[-1]) == 5:
|
||||
# Remove the last two parts (hash and suffix)
|
||||
return '-'.join(parts[:-2])
|
||||
elif len(parts) >= 2 and parts[-1].isalnum() and len(parts[-1]) == 5:
|
||||
# Remove the last part (suffix)
|
||||
return '-'.join(parts[:-1])
|
||||
else:
|
||||
# Fallback to pod name if pattern doesn't match
|
||||
return pod_name
|
||||
|
||||
async def analyze_workload_historical_usage(
|
||||
self,
|
||||
pods: List[PodResource],
|
||||
time_range: str = '24h'
|
||||
) -> List[ResourceValidation]:
|
||||
"""Analyze historical usage for a workload (group of pods)"""
|
||||
if not pods:
|
||||
return []
|
||||
|
||||
# Group pods by workload name
|
||||
workload_pods = {}
|
||||
for pod in pods:
|
||||
workload_name = self._extract_workload_name(pod.name)
|
||||
if workload_name not in workload_pods:
|
||||
workload_pods[workload_name] = []
|
||||
workload_pods[workload_name].append(pod)
|
||||
|
||||
all_validations = []
|
||||
|
||||
# Analyze each workload
|
||||
for workload_name, workload_pod_list in workload_pods.items():
|
||||
try:
|
||||
# Use the first pod as representative for the workload
|
||||
representative_pod = workload_pod_list[0]
|
||||
|
||||
# Analyze historical usage for the workload
|
||||
workload_validations = await self._analyze_workload_metrics(
|
||||
workload_name, representative_pod.namespace, workload_pod_list, time_range
|
||||
)
|
||||
all_validations.extend(workload_validations)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error analyzing workload {workload_name}: {e}")
|
||||
# Fallback to individual pod analysis
|
||||
for pod in workload_pod_list:
|
||||
try:
|
||||
pod_validations = await self.analyze_pod_historical_usage(pod, time_range)
|
||||
all_validations.extend(pod_validations)
|
||||
except Exception as pod_e:
|
||||
logger.warning(f"Error analyzing pod {pod.name}: {pod_e}")
|
||||
|
||||
return all_validations
|
||||
|
||||
async def _analyze_workload_metrics(
|
||||
self,
|
||||
workload_name: str,
|
||||
namespace: str,
|
||||
pods: List[PodResource],
|
||||
time_range: str
|
||||
) -> List[ResourceValidation]:
|
||||
"""Analyze metrics for a workload using Prometheus queries"""
|
||||
validations = []
|
||||
|
||||
try:
|
||||
# Query for CPU usage by workload
|
||||
cpu_query = f'''
|
||||
sum(
|
||||
node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{
|
||||
cluster="",
|
||||
namespace="{namespace}"
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Query for memory usage by workload
|
||||
memory_query = f'''
|
||||
sum(
|
||||
container_memory_working_set_bytes{{
|
||||
job="kubelet",
|
||||
metrics_path="/metrics/cadvisor",
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
container!="",
|
||||
image!=""
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Query for CPU requests by workload
|
||||
cpu_requests_query = f'''
|
||||
sum(
|
||||
kube_pod_container_resource_requests{{
|
||||
resource="cpu",
|
||||
namespace="{namespace}"
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Query for memory requests by workload
|
||||
memory_requests_query = f'''
|
||||
sum(
|
||||
kube_pod_container_resource_requests{{
|
||||
resource="memory",
|
||||
namespace="{namespace}"
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Query for CPU limits by workload
|
||||
cpu_limits_query = f'''
|
||||
sum(
|
||||
kube_pod_container_resource_limits{{
|
||||
resource="cpu",
|
||||
namespace="{namespace}"
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Query for memory limits by workload
|
||||
memory_limits_query = f'''
|
||||
sum(
|
||||
kube_pod_container_resource_limits{{
|
||||
resource="memory",
|
||||
namespace="{namespace}"
|
||||
}}
|
||||
* on(namespace,pod)
|
||||
group_left(workload, workload_type)
|
||||
namespace_workload_pod:kube_pod_owner:relabel{{
|
||||
cluster="",
|
||||
namespace="{namespace}",
|
||||
workload="{workload_name}",
|
||||
workload_type=~".+"
|
||||
}}
|
||||
) by (workload, workload_type)
|
||||
'''
|
||||
|
||||
# Execute queries
|
||||
cpu_usage_data = await self._query_prometheus(cpu_query, time_range)
|
||||
memory_usage_data = await self._query_prometheus(memory_query, time_range)
|
||||
cpu_requests_data = await self._query_prometheus(cpu_requests_query, time_range)
|
||||
memory_requests_data = await self._query_prometheus(memory_requests_query, time_range)
|
||||
cpu_limits_data = await self._query_prometheus(cpu_limits_query, time_range)
|
||||
memory_limits_data = await self._query_prometheus(memory_limits_query, time_range)
|
||||
|
||||
# Analyze CPU metrics for workload
|
||||
if cpu_usage_data and cpu_requests_data and cpu_limits_data:
|
||||
cpu_validations = self._analyze_cpu_metrics(
|
||||
workload_name, namespace, "workload",
|
||||
cpu_usage_data, cpu_requests_data, cpu_limits_data, time_range
|
||||
)
|
||||
validations.extend(cpu_validations)
|
||||
|
||||
# Analyze memory metrics for workload
|
||||
if memory_usage_data and memory_requests_data and memory_limits_data:
|
||||
memory_validations = self._analyze_memory_metrics(
|
||||
workload_name, namespace, "workload",
|
||||
memory_usage_data, memory_requests_data, memory_limits_data, time_range
|
||||
)
|
||||
validations.extend(memory_validations)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error analyzing workload metrics for {workload_name}: {e}")
|
||||
# Fallback to individual pod analysis
|
||||
for pod in pods:
|
||||
try:
|
||||
pod_validations = await self.analyze_pod_historical_usage(pod, time_range)
|
||||
validations.extend(pod_validations)
|
||||
except Exception as pod_e:
|
||||
logger.warning(f"Error analyzing pod {pod.name}: {pod_e}")
|
||||
|
||||
return validations
|
||||
|
||||
async def analyze_pod_historical_usage(
|
||||
self,
|
||||
pod: PodResource,
|
||||
@@ -319,6 +536,7 @@ class HistoricalAnalysisService:
|
||||
message=f"Limited CPU usage data ({len(usage_values)} points) for {time_range}",
|
||||
recommendation="Wait for more data points or extend time range for reliable analysis"
|
||||
))
|
||||
return validations # Don't proceed with historical analysis if insufficient data
|
||||
|
||||
# Current values of requests/limits
|
||||
current_requests = self._safe_float(requests_data[0][1]) if requests_data else 0
|
||||
@@ -442,6 +660,7 @@ class HistoricalAnalysisService:
|
||||
message=f"Limited memory usage data ({len(usage_values)} points) for {time_range}",
|
||||
recommendation="Wait for more data points or extend time range for reliable analysis"
|
||||
))
|
||||
return validations # Don't proceed with historical analysis if insufficient data
|
||||
|
||||
# Current values of requests/limits (in bytes)
|
||||
current_requests = self._safe_float(requests_data[0][1]) if requests_data else 0
|
||||
|
||||
@@ -65,6 +65,39 @@ class ValidationService:
|
||||
|
||||
return static_validations
|
||||
|
||||
async def validate_workload_resources_with_historical_analysis(
|
||||
self,
|
||||
pods: List[PodResource],
|
||||
time_range: str = '24h'
|
||||
) -> List[ResourceValidation]:
|
||||
"""Validate workload resources including historical analysis (recommended approach)"""
|
||||
all_validations = []
|
||||
|
||||
# Static validations for all pods
|
||||
for pod in pods:
|
||||
static_validations = self.validate_pod_resources(pod)
|
||||
all_validations.extend(static_validations)
|
||||
|
||||
# Historical analysis by workload (more reliable than individual pods)
|
||||
try:
|
||||
historical_validations = await self.historical_analysis.analyze_workload_historical_usage(
|
||||
pods, time_range
|
||||
)
|
||||
all_validations.extend(historical_validations)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error in workload historical analysis: {e}")
|
||||
# Fallback to individual pod analysis
|
||||
for pod in pods:
|
||||
try:
|
||||
pod_historical = await self.historical_analysis.analyze_pod_historical_usage(
|
||||
pod, time_range
|
||||
)
|
||||
all_validations.extend(pod_historical)
|
||||
except Exception as pod_e:
|
||||
logger.warning(f"Error in historical analysis for pod {pod.name}: {pod_e}")
|
||||
|
||||
return all_validations
|
||||
|
||||
def _validate_container_resources(
|
||||
self,
|
||||
pod_name: str,
|
||||
|
||||
Reference in New Issue
Block a user