Files
openshift-resource-governance/app/core/thanos_client.py

323 lines
11 KiB
Python

"""
Thanos client for historical data queries and aggregations.
Complements PrometheusClient for long-term data analysis.
"""
import requests
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import json
logger = logging.getLogger(__name__)
class ThanosClient:
"""
Client for querying Thanos (OpenShift's historical metrics store).
Used for historical data, trends, and complex aggregations.
"""
def __init__(self, thanos_url: str = None):
"""
Initialize Thanos client.
Args:
thanos_url: Thanos query endpoint URL
"""
self.thanos_url = thanos_url or self._get_thanos_url()
self.session = requests.Session()
self.session.timeout = 30
# Disable SSL verification for self-signed certificates
self.session.verify = False
# Disable SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add service account token for authentication
self._add_auth_token()
def _get_thanos_url(self) -> str:
"""Get Thanos URL from environment or use default."""
import os
return os.getenv('THANOS_URL', 'http://thanos-query:9090')
def _add_auth_token(self):
"""Add service account token for authentication."""
try:
with open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r') as f:
token = f.read().strip()
self.session.headers.update({
'Authorization': f'Bearer {token}'
})
except FileNotFoundError:
logger.warning("Service account token not found, proceeding without authentication")
def query(self, query: str, time: str = None) -> Dict[str, Any]:
"""
Execute instant query against Thanos.
Args:
query: PromQL query
time: RFC3339 timestamp (default: now)
Returns:
Query result
"""
try:
params = {'query': query}
if time:
params['time'] = time
response = self.session.get(
f"{self.thanos_url}/api/v1/query",
params=params
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Thanos instant query failed: {e}")
return {'status': 'error', 'error': str(e)}
def query_range(self, query: str, start: str, end: str, step: str = "1h") -> Dict[str, Any]:
"""
Execute range query against Thanos.
Args:
query: PromQL query
start: Start time (RFC3339 or relative like "7d")
end: End time (RFC3339 or relative like "now")
step: Query resolution step width
Returns:
Range query result
"""
try:
params = {
'query': query,
'start': start,
'end': end,
'step': step
}
response = self.session.get(
f"{self.thanos_url}/api/v1/query_range",
params=params
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Thanos range query failed: {e}")
return {'status': 'error', 'error': str(e)}
def get_cluster_capacity_historical(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical cluster capacity data.
Args:
days: Number of days to look back
Returns:
Historical capacity data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Query for cluster capacity over time
query = "max(kube_node_status_capacity{resource=\"cpu\"} * on(node) group_left() kube_node_status_allocatable{resource=\"cpu\"}) by (cluster)"
return self.query_range(
query=query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
def get_resource_utilization_trend(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical resource utilization trends.
Args:
days: Number of days to look back
Returns:
Resource utilization trends
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU utilization trend - real cluster data
cpu_query = "avg(rate(container_cpu_usage_seconds_total{container!=\"POD\",container!=\"\"}[5m])) by (cluster)"
# Memory utilization trend - real cluster data
memory_query = "avg(container_memory_working_set_bytes{container!=\"POD\",container!=\"\"}) by (cluster)"
cpu_data = self.query_range(
query=cpu_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
memory_data = self.query_range(
query=memory_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
return {
'cpu_trend': cpu_data,
'memory_trend': memory_data,
'period': f"{days} days",
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat()
}
def get_namespace_resource_trends(self, namespace: str, days: int = 7) -> Dict[str, Any]:
"""
Get historical resource trends for a specific namespace.
Args:
namespace: Namespace name
days: Number of days to look back
Returns:
Namespace resource trends
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU requests trend - real data
cpu_requests_query = f"sum(kube_pod_container_resource_requests{{namespace=\"{namespace}\", resource=\"cpu\"}}) by (namespace)"
# Memory requests trend - real data
memory_requests_query = f"sum(kube_pod_container_resource_requests{{namespace=\"{namespace}\", resource=\"memory\"}}) by (namespace)"
cpu_requests = self.query_range(
query=cpu_requests_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
memory_requests = self.query_range(
query=memory_requests_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
return {
'namespace': namespace,
'cpu_requests_trend': cpu_requests,
'memory_requests_trend': memory_requests,
'period': f"{days} days"
}
def get_overcommit_historical(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical overcommit data.
Args:
days: Number of days to look back
Returns:
Historical overcommit data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU overcommit trend
cpu_overcommit_query = "(sum(kube_pod_container_resource_requests{resource=\"cpu\"}) / sum(kube_node_status_allocatable{resource=\"cpu\"})) * 100"
# Memory overcommit trend
memory_overcommit_query = "(sum(kube_pod_container_resource_requests{resource=\"memory\"}) / sum(kube_node_status_allocatable{resource=\"memory\"})) * 100"
cpu_overcommit = self.query_range(
query=cpu_overcommit_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
memory_overcommit = self.query_range(
query=memory_overcommit_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
return {
'cpu_overcommit_trend': cpu_overcommit,
'memory_overcommit_trend': memory_overcommit,
'period': f"{days} days"
}
def get_top_workloads_historical(self, days: int = 7, limit: int = 10) -> Dict[str, Any]:
"""
Get historical top workloads by resource usage.
Args:
days: Number of days to look back
limit: Number of top workloads to return
Returns:
Historical top workloads data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Top CPU consuming workloads
cpu_query = f"topk({limit}, avg_over_time(rate(container_cpu_usage_seconds_total{{container!=\"POD\",container!=\"\"}}[5m])[1h:1h])) by (namespace, pod, container)"
# Top Memory consuming workloads
memory_query = f"topk({limit}, avg_over_time(container_memory_working_set_bytes{{container!=\"POD\",container!=\"\"}}[1h:1h])) by (namespace, pod, container)"
cpu_workloads = self.query_range(
query=cpu_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
memory_workloads = self.query_range(
query=memory_query,
start=int(start_time.timestamp()),
end=int(end_time.timestamp()),
step="1h"
)
return {
'top_cpu_workloads': cpu_workloads,
'top_memory_workloads': memory_workloads,
'period': f"{days} days",
'limit': limit
}
def health_check(self) -> Dict[str, Any]:
"""
Check Thanos connectivity and health.
Returns:
Health status
"""
try:
# Use a simple query endpoint instead of status/config
response = self.session.get(f"{self.thanos_url}/api/v1/query", params={'query': 'up'})
response.raise_for_status()
return {
'status': 'healthy',
'thanos_url': self.thanos_url,
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
logger.error(f"Thanos health check failed: {e}")
return {
'status': 'unhealthy',
'thanos_url': self.thanos_url,
'error': str(e)
}