""" Optimized Prometheus Client for ORU Analyzer Implements aggregated queries and intelligent caching for 10x performance improvement """ import asyncio import logging import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass import aiohttp import json logger = logging.getLogger(__name__) @dataclass class WorkloadMetrics: """Workload metrics data structure""" workload_name: str namespace: str cpu_usage_cores: float cpu_usage_percent: float cpu_requests_cores: float cpu_requests_percent: float cpu_limits_cores: float cpu_limits_percent: float memory_usage_bytes: float memory_usage_mb: float memory_usage_percent: float memory_requests_bytes: float memory_requests_mb: float memory_requests_percent: float memory_limits_bytes: float memory_limits_mb: float memory_limits_percent: float cpu_efficiency_percent: float memory_efficiency_percent: float timestamp: datetime @dataclass class ClusterMetrics: """Cluster total resources""" cpu_cores_total: float memory_bytes_total: float memory_gb_total: float class PrometheusCache: """Intelligent caching system for Prometheus queries""" def __init__(self, ttl_seconds: int = 300): # 5 minutes default self.cache: Dict[str, Tuple[Any, float]] = {} self.ttl_seconds = ttl_seconds self.hit_count = 0 self.miss_count = 0 def _generate_cache_key(self, query: str, time_range: str, namespace: str = None) -> str: """Generate cache key for query""" key_parts = [query, time_range] if namespace: key_parts.append(namespace) return "|".join(key_parts) def get(self, query: str, time_range: str, namespace: str = None) -> Optional[Any]: """Get cached result""" key = self._generate_cache_key(query, time_range, namespace) if key in self.cache: data, timestamp = self.cache[key] if time.time() - timestamp < self.ttl_seconds: self.hit_count += 1 logger.debug(f"Cache HIT for key: {key[:50]}...") return data self.miss_count += 1 logger.debug(f"Cache MISS for key: {key[:50]}...") return None def set(self, query: str, time_range: str, data: Any, namespace: str = None): """Set cached result""" key = self._generate_cache_key(query, time_range, namespace) self.cache[key] = (data, time.time()) logger.debug(f"Cache SET for key: {key[:50]}...") def clear(self): """Clear all cached data""" self.cache.clear() self.hit_count = 0 self.miss_count = 0 logger.info("Cache cleared") def get_stats(self) -> Dict[str, Any]: """Get cache statistics""" total_requests = self.hit_count + self.miss_count hit_rate = (self.hit_count / total_requests * 100) if total_requests > 0 else 0 return { "hit_count": self.hit_count, "miss_count": self.miss_count, "hit_rate_percent": round(hit_rate, 2), "cached_queries": len(self.cache), "ttl_seconds": self.ttl_seconds } class OptimizedPrometheusClient: """Optimized Prometheus client with aggregated queries and caching""" def __init__(self, prometheus_url: str, token: str = None, cache_ttl: int = 300): self.prometheus_url = prometheus_url.rstrip('/') self.token = token self.cache = PrometheusCache(ttl_seconds=cache_ttl) self.session = None async def __aenter__(self): """Async context manager entry""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" if self.session: await self.session.close() async def _make_request(self, query: str) -> Dict[str, Any]: """Make HTTP request to Prometheus""" if not self.session: raise RuntimeError("Client not initialized. Use async context manager.") url = f"{self.prometheus_url}/api/v1/query" headers = {"Content-Type": "application/json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" params = {"query": query} try: async with self.session.get(url, headers=headers, params=params, ssl=False) as response: response.raise_for_status() return await response.json() except Exception as e: logger.error(f"Prometheus query failed: {e}") raise def _calculate_step(self, time_range: str) -> str: """Calculate appropriate step based on time range""" if time_range == "1h": return "1m" elif time_range == "6h": return "5m" elif time_range == "24h": return "15m" elif time_range == "7d": return "1h" else: return "5m" async def get_cluster_totals(self) -> ClusterMetrics: """Get cluster total resources in a single query""" cache_key = "cluster_totals" cached_result = self.cache.get(cache_key, "1h") if cached_result: return ClusterMetrics(**cached_result) # Single aggregated query for cluster totals cluster_query = """ { cpu_cores: sum(kube_node_status_allocatable{resource="cpu"}), memory_bytes: sum(kube_node_status_allocatable{resource="memory"}) } """ try: result = await self._make_request(cluster_query) if result.get("status") == "success" and result.get("data", {}).get("result"): data = result["data"]["result"][0] cpu_cores = float(data["value"][1]) memory_bytes = float(data["value"][1]) cluster_metrics = ClusterMetrics( cpu_cores_total=cpu_cores, memory_bytes_total=memory_bytes, memory_gb_total=memory_bytes / (1024**3) ) # Cache the result self.cache.set(cache_key, "1h", cluster_metrics.__dict__) return cluster_metrics else: raise Exception("Failed to get cluster totals from Prometheus") except Exception as e: logger.error(f"Error getting cluster totals: {e}") # Return default values if Prometheus is unavailable return ClusterMetrics( cpu_cores_total=0, memory_bytes_total=0, memory_gb_total=0 ) async def get_all_workloads_metrics(self, namespace: str, time_range: str = "24h") -> List[WorkloadMetrics]: """Get metrics for ALL workloads in a single aggregated query""" cache_key = f"workloads_metrics_{namespace}" cached_result = self.cache.get(cache_key, time_range, namespace) if cached_result: return [WorkloadMetrics(**item) for item in cached_result] try: # Get cluster totals first cluster_metrics = await self.get_cluster_totals() # Single aggregated query for all workloads aggregated_query = f""" {{ cpu_usage: sum by (workload, workload_type) ( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{ cluster="", namespace="{namespace}" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ), memory_usage: sum by (workload, workload_type) ( container_memory_working_set_bytes{{ cluster="", namespace="{namespace}", container!="", image!="" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ), cpu_requests: sum by (workload, workload_type) ( kube_pod_container_resource_requests{{ job="kube-state-metrics", cluster="", namespace="{namespace}", resource="cpu" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ), memory_requests: sum by (workload, workload_type) ( kube_pod_container_resource_requests{{ job="kube-state-metrics", cluster="", namespace="{namespace}", resource="memory" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ), cpu_limits: sum by (workload, workload_type) ( kube_pod_container_resource_limits{{ job="kube-state-metrics", cluster="", namespace="{namespace}", resource="cpu" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ), memory_limits: sum by (workload, workload_type) ( kube_pod_container_resource_limits{{ job="kube-state-metrics", cluster="", namespace="{namespace}", resource="memory" }} * on(namespace,pod) group_left(workload, workload_type) namespace_workload_pod:kube_pod_owner:relabel{{ cluster="", namespace="{namespace}", workload_type=~".+" }} ) }} """ result = await self._make_request(aggregated_query) if result.get("status") != "success": raise Exception(f"Prometheus query failed: {result.get('error', 'Unknown error')}") # Process aggregated results workloads_data = {} data = result.get("data", {}).get("result", []) for item in data: metric_name = item["metric"].get("__name__", "") workload = item["metric"].get("workload", "unknown") value = float(item["value"][1]) if workload not in workloads_data: workloads_data[workload] = { "workload_name": workload, "namespace": namespace, "cpu_usage_cores": 0, "memory_usage_bytes": 0, "cpu_requests_cores": 0, "memory_requests_bytes": 0, "cpu_limits_cores": 0, "memory_limits_bytes": 0 } if "cpu_usage" in metric_name: workloads_data[workload]["cpu_usage_cores"] = value elif "memory_usage" in metric_name: workloads_data[workload]["memory_usage_bytes"] = value elif "cpu_requests" in metric_name: workloads_data[workload]["cpu_requests_cores"] = value elif "memory_requests" in metric_name: workloads_data[workload]["memory_requests_bytes"] = value elif "cpu_limits" in metric_name: workloads_data[workload]["cpu_limits_cores"] = value elif "memory_limits" in metric_name: workloads_data[workload]["memory_limits_bytes"] = value # Convert to WorkloadMetrics objects with calculations workloads_metrics = [] for workload_data in workloads_data.values(): # Calculate percentages cpu_usage_percent = (workload_data["cpu_usage_cores"] / cluster_metrics.cpu_cores_total * 100) if cluster_metrics.cpu_cores_total > 0 else 0 memory_usage_percent = (workload_data["memory_usage_bytes"] / cluster_metrics.memory_bytes_total * 100) if cluster_metrics.memory_bytes_total > 0 else 0 cpu_requests_percent = (workload_data["cpu_requests_cores"] / cluster_metrics.cpu_cores_total * 100) if cluster_metrics.cpu_cores_total > 0 else 0 memory_requests_percent = (workload_data["memory_requests_bytes"] / cluster_metrics.memory_bytes_total * 100) if cluster_metrics.memory_bytes_total > 0 else 0 cpu_limits_percent = (workload_data["cpu_limits_cores"] / cluster_metrics.cpu_cores_total * 100) if cluster_metrics.cpu_cores_total > 0 else 0 memory_limits_percent = (workload_data["memory_limits_bytes"] / cluster_metrics.memory_bytes_total * 100) if cluster_metrics.memory_bytes_total > 0 else 0 # Calculate efficiency cpu_efficiency = (workload_data["cpu_usage_cores"] / workload_data["cpu_requests_cores"] * 100) if workload_data["cpu_requests_cores"] > 0 else 0 memory_efficiency = (workload_data["memory_usage_bytes"] / workload_data["memory_requests_bytes"] * 100) if workload_data["memory_requests_bytes"] > 0 else 0 workload_metrics = WorkloadMetrics( workload_name=workload_data["workload_name"], namespace=namespace, cpu_usage_cores=workload_data["cpu_usage_cores"], cpu_usage_percent=round(cpu_usage_percent, 2), cpu_requests_cores=workload_data["cpu_requests_cores"], cpu_requests_percent=round(cpu_requests_percent, 2), cpu_limits_cores=workload_data["cpu_limits_cores"], cpu_limits_percent=round(cpu_limits_percent, 2), memory_usage_bytes=workload_data["memory_usage_bytes"], memory_usage_mb=round(workload_data["memory_usage_bytes"] / (1024**2), 2), memory_usage_percent=round(memory_usage_percent, 2), memory_requests_bytes=workload_data["memory_requests_bytes"], memory_requests_mb=round(workload_data["memory_requests_bytes"] / (1024**2), 2), memory_requests_percent=round(memory_requests_percent, 2), memory_limits_bytes=workload_data["memory_limits_bytes"], memory_limits_mb=round(workload_data["memory_limits_bytes"] / (1024**2), 2), memory_limits_percent=round(memory_limits_percent, 2), cpu_efficiency_percent=round(cpu_efficiency, 1), memory_efficiency_percent=round(memory_efficiency, 1), timestamp=datetime.now() ) workloads_metrics.append(workload_metrics) # Cache the results cache_data = [metrics.__dict__ for metrics in workloads_metrics] self.cache.set(cache_key, time_range, cache_data, namespace) logger.info(f"Retrieved metrics for {len(workloads_metrics)} workloads in namespace {namespace}") return workloads_metrics except Exception as e: logger.error(f"Error getting workload metrics for namespace {namespace}: {e}") return [] async def get_workload_peak_usage(self, namespace: str, workload: str, time_range: str = "7d") -> Dict[str, Any]: """Get peak usage for a specific workload using MAX_OVER_TIME""" cache_key = f"peak_usage_{namespace}_{workload}" cached_result = self.cache.get(cache_key, time_range, namespace) if cached_result: return cached_result try: step = self._calculate_step(time_range) # Peak usage queries using MAX_OVER_TIME peak_queries = { "cpu_peak": f""" max_over_time( sum( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{{ cluster="", namespace="{namespace}", pod=~"{workload}.*" }} ) [{time_range}:{step}] ) """, "memory_peak": f""" max_over_time( sum( container_memory_working_set_bytes{{ cluster="", namespace="{namespace}", pod=~"{workload}.*", container!="", image!="" }} ) [{time_range}:{step}] ) """ } # Execute queries in parallel tasks = [] for metric_name, query in peak_queries.items(): tasks.append(self._make_request(query)) results = await asyncio.gather(*tasks, return_exceptions=True) peak_data = {} for i, (metric_name, query) in enumerate(peak_queries.items()): if isinstance(results[i], Exception): logger.error(f"Peak query {metric_name} failed: {results[i]}") peak_data[metric_name] = 0 else: result = results[i] if result.get("status") == "success" and result.get("data", {}).get("result"): peak_data[metric_name] = float(result["data"]["result"][0]["value"][1]) else: peak_data[metric_name] = 0 # Cache the result self.cache.set(cache_key, time_range, peak_data, namespace) return peak_data except Exception as e: logger.error(f"Error getting peak usage for {workload} in {namespace}: {e}") return {"cpu_peak": 0, "memory_peak": 0} def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" return self.cache.get_stats() def clear_cache(self): """Clear all cached data""" self.cache.clear()