""" Thanos client for historical data queries and aggregations. Complements PrometheusClient for long-term data analysis. """ import requests import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import json logger = logging.getLogger(__name__) class ThanosClient: """ Client for querying Thanos (OpenShift's historical metrics store). Used for historical data, trends, and complex aggregations. """ def __init__(self, thanos_url: str = None): """ Initialize Thanos client. Args: thanos_url: Thanos query endpoint URL """ self.thanos_url = thanos_url or self._get_thanos_url() self.session = requests.Session() self.session.timeout = 30 # Disable SSL verification for self-signed certificates self.session.verify = False # Disable SSL warnings import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Add service account token for authentication self._add_auth_token() def _get_thanos_url(self) -> str: """Get Thanos URL from environment or use default.""" import os return os.getenv('THANOS_URL', 'http://thanos-query:9090') def _add_auth_token(self): """Add service account token for authentication.""" try: with open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r') as f: token = f.read().strip() self.session.headers.update({ 'Authorization': f'Bearer {token}' }) except FileNotFoundError: logger.warning("Service account token not found, proceeding without authentication") def query(self, query: str, time: str = None) -> Dict[str, Any]: """ Execute instant query against Thanos. Args: query: PromQL query time: RFC3339 timestamp (default: now) Returns: Query result """ try: params = {'query': query} if time: params['time'] = time response = self.session.get( f"{self.thanos_url}/api/v1/query", params=params ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Thanos instant query failed: {e}") return {'status': 'error', 'error': str(e)} def query_range(self, query: str, start: str, end: str, step: str = "1h") -> Dict[str, Any]: """ Execute range query against Thanos. Args: query: PromQL query start: Start time (RFC3339 or relative like "7d") end: End time (RFC3339 or relative like "now") step: Query resolution step width Returns: Range query result """ try: params = { 'query': query, 'start': start, 'end': end, 'step': step } response = self.session.get( f"{self.thanos_url}/api/v1/query_range", params=params ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Thanos range query failed: {e}") return {'status': 'error', 'error': str(e)} def get_cluster_capacity_historical(self, days: int = 7) -> Dict[str, Any]: """ Get historical cluster capacity data. Args: days: Number of days to look back Returns: Historical capacity data """ end_time = datetime.now() start_time = end_time - timedelta(days=days) # Query for cluster capacity over time query = """ max( kube_node_status_capacity{resource="cpu"} * on(node) group_left() kube_node_status_allocatable{resource="cpu"} ) by (cluster) """ return self.query_range( query=query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) def get_resource_utilization_trend(self, days: int = 7) -> Dict[str, Any]: """ Get historical resource utilization trends. Args: days: Number of days to look back Returns: Resource utilization trends """ end_time = datetime.now() start_time = end_time - timedelta(days=days) # CPU utilization trend cpu_query = """ avg( rate(container_cpu_usage_seconds_total{container!="POD",container!=""}[5m]) ) by (cluster) """ # Memory utilization trend memory_query = """ avg( container_memory_working_set_bytes{container!="POD",container!=""} ) by (cluster) """ cpu_data = self.query_range( query=cpu_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) memory_data = self.query_range( query=memory_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) return { 'cpu_trend': cpu_data, 'memory_trend': memory_data, 'period': f"{days} days", 'start_time': start_time.isoformat(), 'end_time': end_time.isoformat() } def get_namespace_resource_trends(self, namespace: str, days: int = 7) -> Dict[str, Any]: """ Get historical resource trends for a specific namespace. Args: namespace: Namespace name days: Number of days to look back Returns: Namespace resource trends """ end_time = datetime.now() start_time = end_time - timedelta(days=days) # CPU requests trend cpu_requests_query = f""" sum( kube_pod_container_resource_requests{{namespace="{namespace}", resource="cpu"}} ) by (namespace) """ # Memory requests trend memory_requests_query = f""" sum( kube_pod_container_resource_requests{{namespace="{namespace}", resource="memory"}} ) by (namespace) """ cpu_requests = self.query_range( query=cpu_requests_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) memory_requests = self.query_range( query=memory_requests_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) return { 'namespace': namespace, 'cpu_requests_trend': cpu_requests, 'memory_requests_trend': memory_requests, 'period': f"{days} days" } def get_overcommit_historical(self, days: int = 7) -> Dict[str, Any]: """ Get historical overcommit data. Args: days: Number of days to look back Returns: Historical overcommit data """ end_time = datetime.now() start_time = end_time - timedelta(days=days) # CPU overcommit trend cpu_overcommit_query = """ ( sum(kube_pod_container_resource_requests{resource="cpu"}) / sum(kube_node_status_allocatable{resource="cpu"}) ) * 100 """ # Memory overcommit trend memory_overcommit_query = """ ( sum(kube_pod_container_resource_requests{resource="memory"}) / sum(kube_node_status_allocatable{resource="memory"}) ) * 100 """ cpu_overcommit = self.query_range( query=cpu_overcommit_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) memory_overcommit = self.query_range( query=memory_overcommit_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) return { 'cpu_overcommit_trend': cpu_overcommit, 'memory_overcommit_trend': memory_overcommit, 'period': f"{days} days" } def get_top_workloads_historical(self, days: int = 7, limit: int = 10) -> Dict[str, Any]: """ Get historical top workloads by resource usage. Args: days: Number of days to look back limit: Number of top workloads to return Returns: Historical top workloads data """ end_time = datetime.now() start_time = end_time - timedelta(days=days) # Top CPU consuming workloads cpu_query = f""" topk({limit}, avg_over_time( rate(container_cpu_usage_seconds_total{{container!="POD",container!=""}}[5m])[1h:1h] ) ) by (namespace, pod, container) """ # Top Memory consuming workloads memory_query = f""" topk({limit}, avg_over_time( container_memory_working_set_bytes{{container!="POD",container!=""}}[1h:1h] ) ) by (namespace, pod, container) """ cpu_workloads = self.query_range( query=cpu_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) memory_workloads = self.query_range( query=memory_query, start=start_time.isoformat(), end=end_time.isoformat(), step="1h" ) return { 'top_cpu_workloads': cpu_workloads, 'top_memory_workloads': memory_workloads, 'period': f"{days} days", 'limit': limit } def health_check(self) -> Dict[str, Any]: """ Check Thanos connectivity and health. Returns: Health status """ try: # Use a simple query endpoint instead of status/config response = self.session.get(f"{self.thanos_url}/api/v1/query", params={'query': 'up'}) response.raise_for_status() return { 'status': 'healthy', 'thanos_url': self.thanos_url, 'response_time': response.elapsed.total_seconds() } except Exception as e: logger.error(f"Thanos health check failed: {e}") return { 'status': 'unhealthy', 'thanos_url': self.thanos_url, 'error': str(e) }