""" Prometheus client for metrics collection """ import logging import aiohttp import asyncio from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from app.core.config import settings logger = logging.getLogger(__name__) class PrometheusClient: """Client for Prometheus interaction""" def __init__(self): self.base_url = settings.prometheus_url self.session = None self.initialized = False async def initialize(self): """Initialize Prometheus client""" try: # Create session with SSL verification disabled for self-signed certificates connector = aiohttp.TCPConnector(ssl=False) # Get service account token for authentication token = None try: with open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r') as f: token = f.read().strip() except FileNotFoundError: logger.warning("Service account token not found, proceeding without authentication") # Create headers with token if available headers = {} if token: headers['Authorization'] = f'Bearer {token}' self.session = aiohttp.ClientSession(connector=connector, headers=headers) # Test connection with SSL verification disabled async with self.session.get(f"{self.base_url}/api/v1/query?query=up", ssl=False) as response: if response.status == 200: self.initialized = True logger.info("Prometheus client initialized successfully") else: logger.warning(f"Prometheus returned status {response.status}") except Exception as e: logger.error(f"Error initializing Prometheus client: {e}") # Prometheus may not be available, continue without it self.initialized = False async def query(self, query: str, time: Optional[datetime] = None) -> Dict[str, Any]: """Execute query in Prometheus""" if not self.initialized or not self.session: return {"status": "error", "message": "Prometheus not available"} try: params = {"query": query} if time: params["time"] = int(time.timestamp()) async with self.session.get( f"{self.base_url}/api/v1/query", params=params, ssl=False ) as response: if response.status == 200: data = await response.json() return data else: logger.error(f"Error in Prometheus query: {response.status}") return {"status": "error", "message": f"HTTP {response.status}"} except Exception as e: logger.error(f"Error executing Prometheus query: {e}") return {"status": "error", "message": str(e)} async def query_range(self, query: str, time_range: str = "24h") -> List[List[float]]: """Execute a Prometheus range query""" if not self.initialized or not self.session: return [] try: # Calculate time range end_time = datetime.now() if time_range == "1h": start_time = end_time - timedelta(hours=1) step = "1m" elif time_range == "6h": start_time = end_time - timedelta(hours=6) step = "5m" elif time_range == "24h": start_time = end_time - timedelta(hours=24) step = "15m" elif time_range == "7d": start_time = end_time - timedelta(days=7) step = "1h" else: start_time = end_time - timedelta(hours=24) step = "15m" params = { 'query': query, 'start': int(start_time.timestamp()), 'end': int(end_time.timestamp()), 'step': step } async with self.session.get( f"{self.base_url}/api/v1/query_range", params=params, ssl=False ) as response: if response.status == 200: data = await response.json() if data.get("status") == "success" and data.get("data", {}).get("result"): # Extract time series data points result = data["data"]["result"][0] return result.get("values", []) else: logger.warning(f"No data returned for query: {query}") return [] else: logger.error(f"Prometheus range query failed: {response.status}") return [] except Exception as e: logger.error(f"Error querying Prometheus range: {e}") return [] async def get_pod_cpu_usage(self, namespace: str, pod_name: str) -> Dict[str, Any]: """Get CPU usage for a specific pod""" query = f'rate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod_name}"}}[5m])' return await self.query(query) async def get_pod_memory_usage(self, namespace: str, pod_name: str) -> Dict[str, Any]: """Get memory usage for a specific pod""" query = f'container_memory_working_set_bytes{{namespace="{namespace}", pod="{pod_name}"}}' return await self.query(query) async def get_namespace_resource_usage(self, namespace: str) -> Dict[str, Any]: """Get resource usage of a namespace""" cpu_query = f'sum(rate(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[5m]))' memory_query = f'sum(container_memory_working_set_bytes{{namespace="{namespace}"}})' cpu_result = await self.query(cpu_query) memory_result = await self.query(memory_query) return { "cpu": cpu_result, "memory": memory_result } async def get_cluster_overcommit(self) -> Dict[str, Any]: """Check overcommit in cluster""" # CPU overcommit cpu_capacity_query = 'sum(kube_node_status_capacity{resource="cpu"})' cpu_requests_query = 'sum(kube_pod_container_resource_requests{resource="cpu"})' # Memory overcommit memory_capacity_query = 'sum(kube_node_status_capacity{resource="memory"})' memory_requests_query = 'sum(kube_pod_container_resource_requests{resource="memory"})' cpu_capacity = await self.query(cpu_capacity_query) cpu_requests = await self.query(cpu_requests_query) memory_capacity = await self.query(memory_capacity_query) memory_requests = await self.query(memory_requests_query) return { "cpu": { "capacity": cpu_capacity, "requests": cpu_requests }, "memory": { "capacity": memory_capacity, "requests": memory_requests } } async def get_node_resource_usage(self) -> List[Dict[str, Any]]: """Get resource usage by node""" query = ''' ( kube_node_status_capacity{resource="cpu"} or kube_node_status_capacity{resource="memory"} or kube_pod_container_resource_requests{resource="cpu"} or kube_pod_container_resource_requests{resource="memory"} ) ''' result = await self.query(query) return result async def close(self): """Close HTTP session""" if self.session: await self.session.close()