feat: implement ThanosClient for historical data queries and hybrid Prometheus+Thanos architecture

This commit is contained in:
2025-10-06 12:14:40 -03:00
parent f8aebe9c4c
commit 8c616652af
6 changed files with 482 additions and 0 deletions

View File

@@ -17,6 +17,7 @@ from app.services.report_service import ReportService
from app.services.historical_analysis import HistoricalAnalysisService from app.services.historical_analysis import HistoricalAnalysisService
from app.services.smart_recommendations import SmartRecommendationsService from app.services.smart_recommendations import SmartRecommendationsService
from app.core.prometheus_client import PrometheusClient from app.core.prometheus_client import PrometheusClient
from app.core.thanos_client import ThanosClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2151,3 +2152,125 @@ async def get_celery_health():
'error': str(e), 'error': str(e),
'timestamp': datetime.now().isoformat() 'timestamp': datetime.now().isoformat()
} }
# ============================================================================
# HYBRID APIs (Prometheus + Thanos)
# ============================================================================
@api_router.get("/hybrid/resource-trends")
async def get_resource_trends(days: int = 7):
"""
Get resource utilization trends using Thanos for historical data.
Combines real-time Prometheus data with historical Thanos data.
"""
try:
thanos_client = ThanosClient()
# Get historical trends from Thanos
trends = thanos_client.get_resource_utilization_trend(days)
return {
"data_source": "thanos",
"period_days": days,
"trends": trends,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting resource trends: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/hybrid/namespace-trends/{namespace}")
async def get_namespace_trends(namespace: str, days: int = 7):
"""
Get namespace resource trends using Thanos for historical data.
"""
try:
thanos_client = ThanosClient()
# Get namespace trends from Thanos
trends = thanos_client.get_namespace_resource_trends(namespace, days)
return {
"data_source": "thanos",
"namespace": namespace,
"period_days": days,
"trends": trends,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting namespace trends: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/hybrid/overcommit-trends")
async def get_overcommit_trends(days: int = 7):
"""
Get overcommit trends using Thanos for historical data.
"""
try:
thanos_client = ThanosClient()
# Get overcommit trends from Thanos
trends = thanos_client.get_overcommit_historical(days)
return {
"data_source": "thanos",
"period_days": days,
"trends": trends,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting overcommit trends: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/hybrid/top-workloads")
async def get_top_workloads_historical(days: int = 7, limit: int = 10):
"""
Get historical top workloads using Thanos.
"""
try:
thanos_client = ThanosClient()
# Get top workloads from Thanos
workloads = thanos_client.get_top_workloads_historical(days, limit)
return {
"data_source": "thanos",
"period_days": days,
"limit": limit,
"workloads": workloads,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting top workloads: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/hybrid/health")
async def get_hybrid_health():
"""
Get health status of both Prometheus and Thanos.
"""
try:
prometheus_client = PrometheusClient()
thanos_client = ThanosClient()
# Check both services
prometheus_health = prometheus_client.health_check()
thanos_health = thanos_client.health_check()
return {
"prometheus": prometheus_health,
"thanos": thanos_health,
"overall_status": "healthy" if (
prometheus_health.get("status") == "healthy" and
thanos_health.get("status") == "healthy"
) else "degraded",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error checking hybrid health: {e}")
raise HTTPException(status_code=500, detail=str(e))

345
app/core/thanos_client.py Normal file
View File

@@ -0,0 +1,345 @@
"""
Thanos client for historical data queries and aggregations.
Complements PrometheusClient for long-term data analysis.
"""
import requests
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import json
logger = logging.getLogger(__name__)
class ThanosClient:
"""
Client for querying Thanos (OpenShift's historical metrics store).
Used for historical data, trends, and complex aggregations.
"""
def __init__(self, thanos_url: str = None):
"""
Initialize Thanos client.
Args:
thanos_url: Thanos query endpoint URL
"""
self.thanos_url = thanos_url or self._get_thanos_url()
self.session = requests.Session()
self.session.timeout = 30
def _get_thanos_url(self) -> str:
"""Get Thanos URL from environment or use default."""
import os
return os.getenv('THANOS_URL', 'http://thanos-query:9090')
def query(self, query: str, time: str = None) -> Dict[str, Any]:
"""
Execute instant query against Thanos.
Args:
query: PromQL query
time: RFC3339 timestamp (default: now)
Returns:
Query result
"""
try:
params = {'query': query}
if time:
params['time'] = time
response = self.session.get(
f"{self.thanos_url}/api/v1/query",
params=params
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Thanos instant query failed: {e}")
return {'status': 'error', 'error': str(e)}
def query_range(self, query: str, start: str, end: str, step: str = "1h") -> Dict[str, Any]:
"""
Execute range query against Thanos.
Args:
query: PromQL query
start: Start time (RFC3339 or relative like "7d")
end: End time (RFC3339 or relative like "now")
step: Query resolution step width
Returns:
Range query result
"""
try:
params = {
'query': query,
'start': start,
'end': end,
'step': step
}
response = self.session.get(
f"{self.thanos_url}/api/v1/query_range",
params=params
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Thanos range query failed: {e}")
return {'status': 'error', 'error': str(e)}
def get_cluster_capacity_historical(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical cluster capacity data.
Args:
days: Number of days to look back
Returns:
Historical capacity data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Query for cluster capacity over time
query = """
max(
kube_node_status_capacity{resource="cpu"} * on(node) group_left()
kube_node_status_allocatable{resource="cpu"}
) by (cluster)
"""
return self.query_range(
query=query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
def get_resource_utilization_trend(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical resource utilization trends.
Args:
days: Number of days to look back
Returns:
Resource utilization trends
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU utilization trend
cpu_query = """
avg(
rate(container_cpu_usage_seconds_total{container!="POD",container!=""}[5m])
) by (cluster)
"""
# Memory utilization trend
memory_query = """
avg(
container_memory_working_set_bytes{container!="POD",container!=""}
) by (cluster)
"""
cpu_data = self.query_range(
query=cpu_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
memory_data = self.query_range(
query=memory_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
return {
'cpu_trend': cpu_data,
'memory_trend': memory_data,
'period': f"{days} days",
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat()
}
def get_namespace_resource_trends(self, namespace: str, days: int = 7) -> Dict[str, Any]:
"""
Get historical resource trends for a specific namespace.
Args:
namespace: Namespace name
days: Number of days to look back
Returns:
Namespace resource trends
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU requests trend
cpu_requests_query = f"""
sum(
kube_pod_container_resource_requests{{namespace="{namespace}", resource="cpu"}}
) by (namespace)
"""
# Memory requests trend
memory_requests_query = f"""
sum(
kube_pod_container_resource_requests{{namespace="{namespace}", resource="memory"}}
) by (namespace)
"""
cpu_requests = self.query_range(
query=cpu_requests_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
memory_requests = self.query_range(
query=memory_requests_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
return {
'namespace': namespace,
'cpu_requests_trend': cpu_requests,
'memory_requests_trend': memory_requests,
'period': f"{days} days"
}
def get_overcommit_historical(self, days: int = 7) -> Dict[str, Any]:
"""
Get historical overcommit data.
Args:
days: Number of days to look back
Returns:
Historical overcommit data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# CPU overcommit trend
cpu_overcommit_query = """
(
sum(kube_pod_container_resource_requests{resource="cpu"}) /
sum(kube_node_status_allocatable{resource="cpu"})
) * 100
"""
# Memory overcommit trend
memory_overcommit_query = """
(
sum(kube_pod_container_resource_requests{resource="memory"}) /
sum(kube_node_status_allocatable{resource="memory"})
) * 100
"""
cpu_overcommit = self.query_range(
query=cpu_overcommit_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
memory_overcommit = self.query_range(
query=memory_overcommit_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
return {
'cpu_overcommit_trend': cpu_overcommit,
'memory_overcommit_trend': memory_overcommit,
'period': f"{days} days"
}
def get_top_workloads_historical(self, days: int = 7, limit: int = 10) -> Dict[str, Any]:
"""
Get historical top workloads by resource usage.
Args:
days: Number of days to look back
limit: Number of top workloads to return
Returns:
Historical top workloads data
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Top CPU consuming workloads
cpu_query = f"""
topk({limit},
avg_over_time(
rate(container_cpu_usage_seconds_total{{container!="POD",container!=""}}[5m])[1h:1h]
)
) by (namespace, pod, container)
"""
# Top Memory consuming workloads
memory_query = f"""
topk({limit},
avg_over_time(
container_memory_working_set_bytes{{container!="POD",container!=""}}[1h:1h]
)
) by (namespace, pod, container)
"""
cpu_workloads = self.query_range(
query=cpu_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
memory_workloads = self.query_range(
query=memory_query,
start=start_time.isoformat(),
end=end_time.isoformat(),
step="1h"
)
return {
'top_cpu_workloads': cpu_workloads,
'top_memory_workloads': memory_workloads,
'period': f"{days} days",
'limit': limit
}
def health_check(self) -> Dict[str, Any]:
"""
Check Thanos connectivity and health.
Returns:
Health status
"""
try:
response = self.session.get(f"{self.thanos_url}/api/v1/status/config")
response.raise_for_status()
return {
'status': 'healthy',
'thanos_url': self.thanos_url,
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
logger.error(f"Thanos health check failed: {e}")
return {
'status': 'unhealthy',
'thanos_url': self.thanos_url,
'error': str(e)
}

View File

@@ -5,6 +5,7 @@ from celery import current_task
from app.celery_app import celery_app from app.celery_app import celery_app
from app.core.kubernetes_client import K8sClient from app.core.kubernetes_client import K8sClient
from app.core.prometheus_client import PrometheusClient from app.core.prometheus_client import PrometheusClient
from app.core.thanos_client import ThanosClient
from app.services.validation_service import ValidationService from app.services.validation_service import ValidationService
import logging import logging

View File

@@ -91,6 +91,11 @@ spec:
configMapKeyRef: configMapKeyRef:
name: resource-governance-config name: resource-governance-config
key: PROMETHEUS_URL key: PROMETHEUS_URL
- name: THANOS_URL
valueFrom:
configMapKeyRef:
name: resource-governance-config
key: THANOS_URL
- name: REPORT_EXPORT_PATH - name: REPORT_EXPORT_PATH
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:

View File

@@ -23,6 +23,9 @@ data:
# URL do Prometheus # URL do Prometheus
PROMETHEUS_URL: "https://prometheus-k8s.openshift-monitoring.svc.cluster.local:9091" PROMETHEUS_URL: "https://prometheus-k8s.openshift-monitoring.svc.cluster.local:9091"
# URL do Thanos
THANOS_URL: "https://thanos-query.openshift-monitoring.svc.cluster.local:9091"
# Configurações de relatório # Configurações de relatório
REPORT_EXPORT_PATH: "/tmp/reports" REPORT_EXPORT_PATH: "/tmp/reports"

View File

@@ -103,6 +103,11 @@ spec:
configMapKeyRef: configMapKeyRef:
name: resource-governance-config name: resource-governance-config
key: PROMETHEUS_URL key: PROMETHEUS_URL
- name: THANOS_URL
valueFrom:
configMapKeyRef:
name: resource-governance-config
key: THANOS_URL
- name: REPORT_EXPORT_PATH - name: REPORT_EXPORT_PATH
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef: