Files
openshift-resource-governance/app/services/validation_service.py

368 lines
15 KiB
Python

"""
Resource validation service following Red Hat best practices
"""
import logging
from typing import List, Dict, Any
from decimal import Decimal, InvalidOperation
import re
from app.models.resource_models import PodResource, ResourceValidation, NamespaceResources
from app.core.config import settings
from app.services.historical_analysis import HistoricalAnalysisService
logger = logging.getLogger(__name__)
class ValidationService:
"""Service for resource validation"""
def __init__(self):
self.cpu_ratio = settings.cpu_limit_ratio
self.memory_ratio = settings.memory_limit_ratio
self.min_cpu_request = settings.min_cpu_request
self.min_memory_request = settings.min_memory_request
self.historical_analysis = HistoricalAnalysisService()
def validate_pod_resources(self, pod: PodResource) -> List[ResourceValidation]:
"""Validate pod resources"""
validations = []
for container in pod.containers:
container_validations = self._validate_container_resources(
pod.name, pod.namespace, container
)
validations.extend(container_validations)
return validations
async def validate_pod_resources_with_historical_analysis(
self,
pod: PodResource,
time_range: str = '24h'
) -> List[ResourceValidation]:
"""Validate pod resources including historical analysis"""
# Static validations
static_validations = self.validate_pod_resources(pod)
# Historical analysis
try:
historical_validations = await self.historical_analysis.analyze_pod_historical_usage(
pod, time_range
)
static_validations.extend(historical_validations)
except Exception as e:
logger.warning(f"Error in historical analysis for pod {pod.name}: {e}")
return static_validations
def _validate_container_resources(
self,
pod_name: str,
namespace: str,
container: Dict[str, Any]
) -> List[ResourceValidation]:
"""Validate container resources"""
validations = []
resources = container.get("resources", {})
requests = resources.get("requests", {})
limits = resources.get("limits", {})
# 1. Check if requests are defined
if not requests:
validations.append(ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container["name"],
validation_type="missing_requests",
severity="error",
message="Container without defined requests",
recommendation="Define CPU and memory requests to guarantee QoS"
))
# 2. Check if limits are defined
if not limits:
validations.append(ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container["name"],
validation_type="missing_limits",
severity="warning",
message="Container without defined limits",
recommendation="Define limits to avoid excessive resource consumption"
))
# 3. Validate limit:request ratio
if requests and limits:
cpu_validation = self._validate_cpu_ratio(
pod_name, namespace, container["name"], requests, limits
)
if cpu_validation:
validations.append(cpu_validation)
memory_validation = self._validate_memory_ratio(
pod_name, namespace, container["name"], requests, limits
)
if memory_validation:
validations.append(memory_validation)
# 4. Validate minimum values
if requests:
min_validation = self._validate_minimum_values(
pod_name, namespace, container["name"], requests
)
validations.extend(min_validation)
return validations
def _validate_cpu_ratio(
self,
pod_name: str,
namespace: str,
container_name: str,
requests: Dict[str, str],
limits: Dict[str, str]
) -> ResourceValidation:
"""Validate CPU limit:request ratio"""
if "cpu" not in requests or "cpu" not in limits:
return None
try:
request_value = self._parse_cpu_value(requests["cpu"])
limit_value = self._parse_cpu_value(limits["cpu"])
if request_value > 0:
ratio = limit_value / request_value
if ratio > self.cpu_ratio * 1.5: # 50% de tolerância
return ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="invalid_ratio",
severity="warning",
message=f"CPU limit:request ratio too high ({ratio:.2f}:1)",
recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.cpu_ratio}:1)"
)
elif ratio < 1.0:
return ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="invalid_ratio",
severity="error",
message=f"CPU limit less than request ({ratio:.2f}:1)",
recommendation="CPU limit should be greater than or equal to request"
)
except (ValueError, InvalidOperation) as e:
logger.warning(f"Error validating CPU ratio: {e}")
return None
def _validate_memory_ratio(
self,
pod_name: str,
namespace: str,
container_name: str,
requests: Dict[str, str],
limits: Dict[str, str]
) -> ResourceValidation:
"""Validate memory limit:request ratio"""
if "memory" not in requests or "memory" not in limits:
return None
try:
request_value = self._parse_memory_value(requests["memory"])
limit_value = self._parse_memory_value(limits["memory"])
if request_value > 0:
ratio = limit_value / request_value
if ratio > self.memory_ratio * 1.5: # 50% de tolerância
return ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="invalid_ratio",
severity="warning",
message=f"Memory limit:request ratio too high ({ratio:.2f}:1)",
recommendation=f"Consider reducing limits or increasing requests (recommended ratio: {self.memory_ratio}:1)"
)
elif ratio < 1.0:
return ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="invalid_ratio",
severity="error",
message=f"Memory limit less than request ({ratio:.2f}:1)",
recommendation="Memory limit should be greater than or equal to request"
)
except (ValueError, InvalidOperation) as e:
logger.warning(f"Error validating memory ratio: {e}")
return None
def _validate_minimum_values(
self,
pod_name: str,
namespace: str,
container_name: str,
requests: Dict[str, str]
) -> List[ResourceValidation]:
"""Validate minimum request values"""
validations = []
# Validate minimum CPU
if "cpu" in requests:
try:
request_value = self._parse_cpu_value(requests["cpu"])
min_value = self._parse_cpu_value(self.min_cpu_request)
if request_value < min_value:
validations.append(ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="minimum_value",
severity="warning",
message=f"CPU request too low ({requests['cpu']})",
recommendation=f"Consider increasing to at least {self.min_cpu_request}"
))
except (ValueError, InvalidOperation):
pass
# Validate minimum memory
if "memory" in requests:
try:
request_value = self._parse_memory_value(requests["memory"])
min_value = self._parse_memory_value(self.min_memory_request)
if request_value < min_value:
validations.append(ResourceValidation(
pod_name=pod_name,
namespace=namespace,
container_name=container_name,
validation_type="minimum_value",
severity="warning",
message=f"Memory request too low ({requests['memory']})",
recommendation=f"Consider increasing to at least {self.min_memory_request}"
))
except (ValueError, InvalidOperation):
pass
return validations
def _parse_cpu_value(self, value: str) -> float:
"""Convert CPU value to float (cores)"""
if value.endswith('m'):
return float(value[:-1]) / 1000
elif value.endswith('n'):
return float(value[:-1]) / 1000000000
else:
return float(value)
def _parse_memory_value(self, value: str) -> int:
"""Convert memory value to bytes"""
value = value.upper()
if value.endswith('KI'):
return int(float(value[:-2]) * 1024)
elif value.endswith('MI'):
return int(float(value[:-2]) * 1024 * 1024)
elif value.endswith('GI'):
return int(float(value[:-2]) * 1024 * 1024 * 1024)
elif value.endswith('K'):
return int(float(value[:-1]) * 1000)
elif value.endswith('M'):
return int(float(value[:-1]) * 1000 * 1000)
elif value.endswith('G'):
return int(float(value[:-1]) * 1000 * 1000 * 1000)
else:
return int(value)
def validate_namespace_overcommit(
self,
namespace_resources: NamespaceResources,
node_capacity: Dict[str, str]
) -> List[ResourceValidation]:
"""Validate overcommit in a namespace"""
validations = []
# Calculate total namespace requests
total_cpu_requests = self._parse_cpu_value(namespace_resources.total_cpu_requests)
total_memory_requests = self._parse_memory_value(namespace_resources.total_memory_requests)
# Calculate total node capacity
total_cpu_capacity = self._parse_cpu_value(node_capacity.get("cpu", "0"))
total_memory_capacity = self._parse_memory_value(node_capacity.get("memory", "0"))
# Check CPU overcommit
if total_cpu_capacity > 0:
cpu_utilization = (total_cpu_requests / total_cpu_capacity) * 100
if cpu_utilization > 100:
validations.append(ResourceValidation(
pod_name="namespace",
namespace=namespace_resources.name,
container_name="all",
validation_type="overcommit",
severity="critical",
message=f"CPU overcommit in namespace: {cpu_utilization:.1f}%",
recommendation="Reduce CPU requests or add more nodes to the cluster"
))
# Check memory overcommit
if total_memory_capacity > 0:
memory_utilization = (total_memory_requests / total_memory_capacity) * 100
if memory_utilization > 100:
validations.append(ResourceValidation(
pod_name="namespace",
namespace=namespace_resources.name,
container_name="all",
validation_type="overcommit",
severity="critical",
message=f"Memory overcommit in namespace: {memory_utilization:.1f}%",
recommendation="Reduce memory requests or add more nodes to the cluster"
))
return validations
def generate_recommendations(self, validations: List[ResourceValidation]) -> List[str]:
"""Generate recommendations based on validations"""
recommendations = []
# Group validations by type
validation_counts = {}
for validation in validations:
validation_type = validation.validation_type
if validation_type not in validation_counts:
validation_counts[validation_type] = 0
validation_counts[validation_type] += 1
# Generate recommendations based on found issues
if validation_counts.get("missing_requests", 0) > 0:
recommendations.append(
f"Implement LimitRange in namespace to define default requests "
f"({validation_counts['missing_requests']} containers without requests)"
)
if validation_counts.get("missing_limits", 0) > 0:
recommendations.append(
f"Define limits for {validation_counts['missing_limits']} containers "
"to avoid excessive resource consumption"
)
if validation_counts.get("invalid_ratio", 0) > 0:
recommendations.append(
f"Adjust limit:request ratio for {validation_counts['invalid_ratio']} containers "
f"(recommended: {self.cpu_ratio}:1)"
)
if validation_counts.get("overcommit", 0) > 0:
recommendations.append(
f"Resolve overcommit in {validation_counts['overcommit']} namespaces "
"to avoid performance issues"
)
return recommendations