diff --git a/app/api/routes.py b/app/api/routes.py index 90b288c..6707e3f 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -17,6 +17,7 @@ from app.services.report_service import ReportService from app.services.historical_analysis import HistoricalAnalysisService from app.services.smart_recommendations import SmartRecommendationsService from app.core.prometheus_client import PrometheusClient +from app.core.thanos_client import ThanosClient logger = logging.getLogger(__name__) @@ -2151,3 +2152,125 @@ async def get_celery_health(): 'error': str(e), 'timestamp': datetime.now().isoformat() } + +# ============================================================================ +# HYBRID APIs (Prometheus + Thanos) +# ============================================================================ + +@api_router.get("/hybrid/resource-trends") +async def get_resource_trends(days: int = 7): + """ + Get resource utilization trends using Thanos for historical data. + Combines real-time Prometheus data with historical Thanos data. + """ + try: + thanos_client = ThanosClient() + + # Get historical trends from Thanos + trends = thanos_client.get_resource_utilization_trend(days) + + return { + "data_source": "thanos", + "period_days": days, + "trends": trends, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting resource trends: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/hybrid/namespace-trends/{namespace}") +async def get_namespace_trends(namespace: str, days: int = 7): + """ + Get namespace resource trends using Thanos for historical data. + """ + try: + thanos_client = ThanosClient() + + # Get namespace trends from Thanos + trends = thanos_client.get_namespace_resource_trends(namespace, days) + + return { + "data_source": "thanos", + "namespace": namespace, + "period_days": days, + "trends": trends, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting namespace trends: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/hybrid/overcommit-trends") +async def get_overcommit_trends(days: int = 7): + """ + Get overcommit trends using Thanos for historical data. + """ + try: + thanos_client = ThanosClient() + + # Get overcommit trends from Thanos + trends = thanos_client.get_overcommit_historical(days) + + return { + "data_source": "thanos", + "period_days": days, + "trends": trends, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting overcommit trends: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/hybrid/top-workloads") +async def get_top_workloads_historical(days: int = 7, limit: int = 10): + """ + Get historical top workloads using Thanos. + """ + try: + thanos_client = ThanosClient() + + # Get top workloads from Thanos + workloads = thanos_client.get_top_workloads_historical(days, limit) + + return { + "data_source": "thanos", + "period_days": days, + "limit": limit, + "workloads": workloads, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting top workloads: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/hybrid/health") +async def get_hybrid_health(): + """ + Get health status of both Prometheus and Thanos. + """ + try: + prometheus_client = PrometheusClient() + thanos_client = ThanosClient() + + # Check both services + prometheus_health = prometheus_client.health_check() + thanos_health = thanos_client.health_check() + + return { + "prometheus": prometheus_health, + "thanos": thanos_health, + "overall_status": "healthy" if ( + prometheus_health.get("status") == "healthy" and + thanos_health.get("status") == "healthy" + ) else "degraded", + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error checking hybrid health: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/core/thanos_client.py b/app/core/thanos_client.py new file mode 100644 index 0000000..812f25e --- /dev/null +++ b/app/core/thanos_client.py @@ -0,0 +1,345 @@ +""" +Thanos client for historical data queries and aggregations. +Complements PrometheusClient for long-term data analysis. +""" +import requests +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any +import json + +logger = logging.getLogger(__name__) + +class ThanosClient: + """ + Client for querying Thanos (OpenShift's historical metrics store). + Used for historical data, trends, and complex aggregations. + """ + + def __init__(self, thanos_url: str = None): + """ + Initialize Thanos client. + + Args: + thanos_url: Thanos query endpoint URL + """ + self.thanos_url = thanos_url or self._get_thanos_url() + self.session = requests.Session() + self.session.timeout = 30 + + def _get_thanos_url(self) -> str: + """Get Thanos URL from environment or use default.""" + import os + return os.getenv('THANOS_URL', 'http://thanos-query:9090') + + def query(self, query: str, time: str = None) -> Dict[str, Any]: + """ + Execute instant query against Thanos. + + Args: + query: PromQL query + time: RFC3339 timestamp (default: now) + + Returns: + Query result + """ + try: + params = {'query': query} + if time: + params['time'] = time + + response = self.session.get( + f"{self.thanos_url}/api/v1/query", + params=params + ) + response.raise_for_status() + return response.json() + + except Exception as e: + logger.error(f"Thanos instant query failed: {e}") + return {'status': 'error', 'error': str(e)} + + def query_range(self, query: str, start: str, end: str, step: str = "1h") -> Dict[str, Any]: + """ + Execute range query against Thanos. + + Args: + query: PromQL query + start: Start time (RFC3339 or relative like "7d") + end: End time (RFC3339 or relative like "now") + step: Query resolution step width + + Returns: + Range query result + """ + try: + params = { + 'query': query, + 'start': start, + 'end': end, + 'step': step + } + + response = self.session.get( + f"{self.thanos_url}/api/v1/query_range", + params=params + ) + response.raise_for_status() + return response.json() + + except Exception as e: + logger.error(f"Thanos range query failed: {e}") + return {'status': 'error', 'error': str(e)} + + def get_cluster_capacity_historical(self, days: int = 7) -> Dict[str, Any]: + """ + Get historical cluster capacity data. + + Args: + days: Number of days to look back + + Returns: + Historical capacity data + """ + end_time = datetime.now() + start_time = end_time - timedelta(days=days) + + # Query for cluster capacity over time + query = """ + max( + kube_node_status_capacity{resource="cpu"} * on(node) group_left() + kube_node_status_allocatable{resource="cpu"} + ) by (cluster) + """ + + return self.query_range( + query=query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + def get_resource_utilization_trend(self, days: int = 7) -> Dict[str, Any]: + """ + Get historical resource utilization trends. + + Args: + days: Number of days to look back + + Returns: + Resource utilization trends + """ + end_time = datetime.now() + start_time = end_time - timedelta(days=days) + + # CPU utilization trend + cpu_query = """ + avg( + rate(container_cpu_usage_seconds_total{container!="POD",container!=""}[5m]) + ) by (cluster) + """ + + # Memory utilization trend + memory_query = """ + avg( + container_memory_working_set_bytes{container!="POD",container!=""} + ) by (cluster) + """ + + cpu_data = self.query_range( + query=cpu_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + memory_data = self.query_range( + query=memory_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + return { + 'cpu_trend': cpu_data, + 'memory_trend': memory_data, + 'period': f"{days} days", + 'start_time': start_time.isoformat(), + 'end_time': end_time.isoformat() + } + + def get_namespace_resource_trends(self, namespace: str, days: int = 7) -> Dict[str, Any]: + """ + Get historical resource trends for a specific namespace. + + Args: + namespace: Namespace name + days: Number of days to look back + + Returns: + Namespace resource trends + """ + end_time = datetime.now() + start_time = end_time - timedelta(days=days) + + # CPU requests trend + cpu_requests_query = f""" + sum( + kube_pod_container_resource_requests{{namespace="{namespace}", resource="cpu"}} + ) by (namespace) + """ + + # Memory requests trend + memory_requests_query = f""" + sum( + kube_pod_container_resource_requests{{namespace="{namespace}", resource="memory"}} + ) by (namespace) + """ + + cpu_requests = self.query_range( + query=cpu_requests_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + memory_requests = self.query_range( + query=memory_requests_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + return { + 'namespace': namespace, + 'cpu_requests_trend': cpu_requests, + 'memory_requests_trend': memory_requests, + 'period': f"{days} days" + } + + def get_overcommit_historical(self, days: int = 7) -> Dict[str, Any]: + """ + Get historical overcommit data. + + Args: + days: Number of days to look back + + Returns: + Historical overcommit data + """ + end_time = datetime.now() + start_time = end_time - timedelta(days=days) + + # CPU overcommit trend + cpu_overcommit_query = """ + ( + sum(kube_pod_container_resource_requests{resource="cpu"}) / + sum(kube_node_status_allocatable{resource="cpu"}) + ) * 100 + """ + + # Memory overcommit trend + memory_overcommit_query = """ + ( + sum(kube_pod_container_resource_requests{resource="memory"}) / + sum(kube_node_status_allocatable{resource="memory"}) + ) * 100 + """ + + cpu_overcommit = self.query_range( + query=cpu_overcommit_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + memory_overcommit = self.query_range( + query=memory_overcommit_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + return { + 'cpu_overcommit_trend': cpu_overcommit, + 'memory_overcommit_trend': memory_overcommit, + 'period': f"{days} days" + } + + def get_top_workloads_historical(self, days: int = 7, limit: int = 10) -> Dict[str, Any]: + """ + Get historical top workloads by resource usage. + + Args: + days: Number of days to look back + limit: Number of top workloads to return + + Returns: + Historical top workloads data + """ + end_time = datetime.now() + start_time = end_time - timedelta(days=days) + + # Top CPU consuming workloads + cpu_query = f""" + topk({limit}, + avg_over_time( + rate(container_cpu_usage_seconds_total{{container!="POD",container!=""}}[5m])[1h:1h] + ) + ) by (namespace, pod, container) + """ + + # Top Memory consuming workloads + memory_query = f""" + topk({limit}, + avg_over_time( + container_memory_working_set_bytes{{container!="POD",container!=""}}[1h:1h] + ) + ) by (namespace, pod, container) + """ + + cpu_workloads = self.query_range( + query=cpu_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + memory_workloads = self.query_range( + query=memory_query, + start=start_time.isoformat(), + end=end_time.isoformat(), + step="1h" + ) + + return { + 'top_cpu_workloads': cpu_workloads, + 'top_memory_workloads': memory_workloads, + 'period': f"{days} days", + 'limit': limit + } + + def health_check(self) -> Dict[str, Any]: + """ + Check Thanos connectivity and health. + + Returns: + Health status + """ + try: + response = self.session.get(f"{self.thanos_url}/api/v1/status/config") + response.raise_for_status() + + return { + 'status': 'healthy', + 'thanos_url': self.thanos_url, + 'response_time': response.elapsed.total_seconds() + } + + except Exception as e: + logger.error(f"Thanos health check failed: {e}") + return { + 'status': 'unhealthy', + 'thanos_url': self.thanos_url, + 'error': str(e) + } diff --git a/app/tasks/cluster_analysis.py b/app/tasks/cluster_analysis.py index 645e8de..76828ca 100644 --- a/app/tasks/cluster_analysis.py +++ b/app/tasks/cluster_analysis.py @@ -5,6 +5,7 @@ from celery import current_task from app.celery_app import celery_app from app.core.kubernetes_client import K8sClient from app.core.prometheus_client import PrometheusClient +from app.core.thanos_client import ThanosClient from app.services.validation_service import ValidationService import logging diff --git a/k8s/celery-worker-deployment.yaml b/k8s/celery-worker-deployment.yaml index 39881b9..f761c5b 100644 --- a/k8s/celery-worker-deployment.yaml +++ b/k8s/celery-worker-deployment.yaml @@ -91,6 +91,11 @@ spec: configMapKeyRef: name: resource-governance-config key: PROMETHEUS_URL + - name: THANOS_URL + valueFrom: + configMapKeyRef: + name: resource-governance-config + key: THANOS_URL - name: REPORT_EXPORT_PATH valueFrom: configMapKeyRef: diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index f679654..322c8d1 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -23,6 +23,9 @@ data: # URL do Prometheus PROMETHEUS_URL: "https://prometheus-k8s.openshift-monitoring.svc.cluster.local:9091" + # URL do Thanos + THANOS_URL: "https://thanos-query.openshift-monitoring.svc.cluster.local:9091" + # Configurações de relatório REPORT_EXPORT_PATH: "/tmp/reports" diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml index 9209692..318bb35 100644 --- a/k8s/deployment.yaml +++ b/k8s/deployment.yaml @@ -103,6 +103,11 @@ spec: configMapKeyRef: name: resource-governance-config key: PROMETHEUS_URL + - name: THANOS_URL + valueFrom: + configMapKeyRef: + name: resource-governance-config + key: THANOS_URL - name: REPORT_EXPORT_PATH valueFrom: configMapKeyRef: