From bf06ae190a18bb719b52c56135edb19473d520ce Mon Sep 17 00:00:00 2001 From: andersonid Date: Mon, 6 Oct 2025 10:40:20 -0300 Subject: [PATCH] fix: correct KubernetesClient import to K8sClient in Celery tasks --- Dockerfile | 3 + Dockerfile.celery | 59 ++++++++ app/api/routes.py | 206 +++++++++++++++++++++++++ app/celery_app.py | 69 +++++++++ app/tasks/__init__.py | 3 + app/tasks/cluster_analysis.py | 189 +++++++++++++++++++++++ app/tasks/prometheus_queries.py | 218 ++++++++++++++++++++++++++ app/tasks/recommendations.py | 260 ++++++++++++++++++++++++++++++++ app/workers/celery_beat.py | 20 +++ app/workers/celery_worker.py | 22 +++ docker-compose.yml | 86 +++++++++++ k8s/deployment.yaml | 15 ++ k8s/kustomization.yaml | 2 + k8s/redis-configmap.yaml | 9 ++ k8s/redis-deployment.yaml | 61 ++++++++ requirements.txt | 3 + scripts/deploy-complete.sh | 8 + 17 files changed, 1233 insertions(+) create mode 100644 Dockerfile.celery create mode 100644 app/celery_app.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/cluster_analysis.py create mode 100644 app/tasks/prometheus_queries.py create mode 100644 app/tasks/recommendations.py create mode 100644 app/workers/celery_beat.py create mode 100644 app/workers/celery_worker.py create mode 100644 docker-compose.yml create mode 100644 k8s/redis-configmap.yaml create mode 100644 k8s/redis-deployment.yaml diff --git a/Dockerfile b/Dockerfile index 3a04960..f93c5de 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,5 +52,8 @@ EXPOSE 8080 HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8080/health || exit 1 +# Tornar scripts executáveis +RUN chmod +x ./app/workers/celery_worker.py ./app/workers/celery_beat.py + # Comando para executar a aplicação CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/Dockerfile.celery b/Dockerfile.celery new file mode 100644 index 0000000..3059b3d --- /dev/null +++ b/Dockerfile.celery @@ -0,0 +1,59 @@ +# Multi-stage build para otimizar tamanho da imagem +FROM python:3.11-slim as builder + +# Instalar dependências do sistema necessárias para compilação +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# Criar diretório de trabalho +WORKDIR /app + +# Copiar requirements e instalar dependências Python +COPY requirements.txt . +RUN pip install --no-cache-dir --user -r requirements.txt + +# Stage final - imagem de produção +FROM python:3.11-slim + +# Instalar dependências de runtime +RUN apt-get update && apt-get install -y \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Criar usuário não-root +RUN groupadd -r appuser && useradd -r -g appuser appuser + +# Criar diretórios necessários +RUN mkdir -p /app /tmp/reports && \ + chown -R appuser:appuser /app /tmp/reports + +# Instalar dependências Python globalmente +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Definir diretório de trabalho +WORKDIR /app + +# Copiar código da aplicação +COPY app/ ./app/ + +# Tornar scripts executáveis +RUN chmod +x ./app/workers/celery_worker.py ./app/workers/celery_beat.py + +# Alterar propriedade dos arquivos +RUN chown -R appuser:appuser /app + +# Mudar para usuário não-root +USER appuser + +# Expor porta +EXPOSE 8080 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Comando para executar a aplicação (FastAPI) +CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/app/api/routes.py b/app/api/routes.py index d75ec67..f7952ac 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -1939,3 +1939,209 @@ async def get_cache_statistics(): except Exception as e: logger.error(f"Error getting cache statistics: {e}") raise HTTPException(status_code=500, detail=str(e)) + +# ============================================================================ +# CELERY BACKGROUND TASKS API +# ============================================================================ + +@api_router.post("/tasks/cluster/analyze") +async def start_cluster_analysis(): + """Start background cluster analysis task""" + try: + from app.tasks.cluster_analysis import analyze_cluster + + # Start background task + task = analyze_cluster.delay() + + return { + "task_id": task.id, + "status": "started", + "message": "Cluster analysis started in background", + "check_status_url": f"/api/v1/tasks/{task.id}/status" + } + + except Exception as e: + logger.error(f"Error starting cluster analysis: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.post("/tasks/namespace/{namespace}/analyze") +async def start_namespace_analysis(namespace: str): + """Start background namespace analysis task""" + try: + from app.tasks.cluster_analysis import analyze_namespace + + # Start background task + task = analyze_namespace.delay(namespace) + + return { + "task_id": task.id, + "namespace": namespace, + "status": "started", + "message": f"Namespace {namespace} analysis started in background", + "check_status_url": f"/api/v1/tasks/{task.id}/status" + } + + except Exception as e: + logger.error(f"Error starting namespace analysis: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.post("/tasks/historical/{namespace}/{workload}") +async def start_historical_analysis(namespace: str, workload: str, time_range: str = "24h"): + """Start background historical analysis task""" + try: + from app.tasks.prometheus_queries import query_historical_data + + # Start background task + task = query_historical_data.delay(namespace, workload, time_range) + + return { + "task_id": task.id, + "namespace": namespace, + "workload": workload, + "time_range": time_range, + "status": "started", + "message": f"Historical analysis for {namespace}/{workload} started in background", + "check_status_url": f"/api/v1/tasks/{task.id}/status" + } + + except Exception as e: + logger.error(f"Error starting historical analysis: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.post("/tasks/recommendations/generate") +async def start_recommendations_generation(cluster_data: dict): + """Start background smart recommendations generation task""" + try: + from app.tasks.recommendations import generate_smart_recommendations + + # Start background task + task = generate_smart_recommendations.delay(cluster_data) + + return { + "task_id": task.id, + "status": "started", + "message": "Smart recommendations generation started in background", + "check_status_url": f"/api/v1/tasks/{task.id}/status" + } + + except Exception as e: + logger.error(f"Error starting recommendations generation: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/tasks/{task_id}/status") +async def get_task_status(task_id: str): + """Get task status and results""" + try: + from app.celery_app import celery_app + + # Get task result + result = celery_app.AsyncResult(task_id) + + if result.state == 'PENDING': + response = { + 'task_id': task_id, + 'state': result.state, + 'status': 'Task is waiting to be processed...' + } + elif result.state == 'PROGRESS': + response = { + 'task_id': task_id, + 'state': result.state, + 'current': result.info.get('current', 0), + 'total': result.info.get('total', 1), + 'status': result.info.get('status', ''), + 'progress': f"{result.info.get('current', 0)}/{result.info.get('total', 1)}" + } + elif result.state == 'SUCCESS': + response = { + 'task_id': task_id, + 'state': result.state, + 'result': result.result, + 'status': 'Task completed successfully' + } + else: # FAILURE + response = { + 'task_id': task_id, + 'state': result.state, + 'error': str(result.info), + 'status': 'Task failed' + } + + return response + + except Exception as e: + logger.error(f"Error getting task status: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/tasks/{task_id}/result") +async def get_task_result(task_id: str): + """Get task result (only if completed)""" + try: + from app.celery_app import celery_app + + # Get task result + result = celery_app.AsyncResult(task_id) + + if result.state == 'SUCCESS': + return { + 'task_id': task_id, + 'state': result.state, + 'result': result.result + } + else: + return { + 'task_id': task_id, + 'state': result.state, + 'message': 'Task not completed yet', + 'check_status_url': f"/api/v1/tasks/{task_id}/status" + } + + except Exception as e: + logger.error(f"Error getting task result: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.delete("/tasks/{task_id}") +async def cancel_task(task_id: str): + """Cancel a running task""" + try: + from app.celery_app import celery_app + + # Revoke task + celery_app.control.revoke(task_id, terminate=True) + + return { + 'task_id': task_id, + 'status': 'cancelled', + 'message': 'Task cancelled successfully' + } + + except Exception as e: + logger.error(f"Error cancelling task: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/tasks/health") +async def get_celery_health(): + """Get Celery workers health status""" + try: + from app.celery_app import celery_app + + # Get active workers + inspect = celery_app.control.inspect() + active_workers = inspect.active() + stats = inspect.stats() + + return { + 'celery_status': 'running', + 'active_workers': len(active_workers) if active_workers else 0, + 'workers': active_workers, + 'stats': stats, + 'timestamp': datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting Celery health: {e}") + return { + 'celery_status': 'error', + 'error': str(e), + 'timestamp': datetime.now().isoformat() + } diff --git a/app/celery_app.py b/app/celery_app.py new file mode 100644 index 0000000..448e88f --- /dev/null +++ b/app/celery_app.py @@ -0,0 +1,69 @@ +""" +Celery configuration for background task processing. +""" +from celery import Celery +import os + +# Redis configuration +REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0') + +# Create Celery instance +celery_app = Celery( + 'oru_analyzer', + broker=REDIS_URL, + backend=REDIS_URL, + include=[ + 'app.tasks.cluster_analysis', + 'app.tasks.prometheus_queries', + 'app.tasks.recommendations' + ] +) + +# Celery configuration +celery_app.conf.update( + # Task settings + task_serializer='json', + accept_content=['json'], + result_serializer='json', + timezone='UTC', + enable_utc=True, + + # Task routing + task_routes={ + 'app.tasks.cluster_analysis.*': {'queue': 'cluster_analysis'}, + 'app.tasks.prometheus_queries.*': {'queue': 'prometheus'}, + 'app.tasks.recommendations.*': {'queue': 'recommendations'}, + }, + + # Task execution + task_acks_late=True, + worker_prefetch_multiplier=1, + task_reject_on_worker_lost=True, + + # Result settings + result_expires=3600, # 1 hour + result_persistent=True, + + # Monitoring + worker_send_task_events=True, + task_send_sent_event=True, + + # Retry settings + task_default_retry_delay=60, # 1 minute + task_max_retries=3, + + # Task time limits + task_soft_time_limit=300, # 5 minutes + task_time_limit=600, # 10 minutes +) + +# Optional: Configure periodic tasks +celery_app.conf.beat_schedule = { + 'health-check': { + 'task': 'app.tasks.cluster_analysis.health_check', + 'schedule': 60.0, # Every minute + }, +} + +if __name__ == '__main__': + celery_app.start() diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..5420b36 --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,3 @@ +""" +Celery tasks package for background processing. +""" diff --git a/app/tasks/cluster_analysis.py b/app/tasks/cluster_analysis.py new file mode 100644 index 0000000..26eeb84 --- /dev/null +++ b/app/tasks/cluster_analysis.py @@ -0,0 +1,189 @@ +""" +Celery tasks for cluster analysis. +""" +from celery import current_task +from app.celery_app import celery_app +from app.core.kubernetes_client import K8sClient +from app.core.prometheus_client import PrometheusClient +from app.services.validation_service import ValidationService +import logging + +logger = logging.getLogger(__name__) + +@celery_app.task(bind=True, name='app.tasks.cluster_analysis.analyze_cluster') +def analyze_cluster(self, cluster_config=None): + """ + Analyze cluster resources and generate recommendations. + + Args: + cluster_config: Cluster configuration dict + + Returns: + dict: Analysis results + """ + try: + # Update task state + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 5, 'status': 'Starting cluster analysis...'} + ) + + # Step 1: Initialize clients + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 5, 'status': 'Connecting to Kubernetes API...'} + ) + + k8s_client = K8sClient() + prometheus_client = PrometheusClient() + validation_service = ValidationService() + + # Step 2: Discover cluster resources + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 5, 'status': 'Discovering cluster resources...'} + ) + + # Get cluster resources + namespaces = k8s_client.get_namespaces() + pods = k8s_client.get_pods() + nodes = k8s_client.get_nodes() + + logger.info(f"Discovered {len(namespaces)} namespaces, {len(pods)} pods, {len(nodes)} nodes") + + # Step 3: Analyze resource configurations + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 5, 'status': 'Analyzing resource configurations...'} + ) + + # Validate resource configurations + validations = validation_service.validate_cluster_resources(pods) + + # Step 4: Query Prometheus metrics + self.update_state( + state='PROGRESS', + meta={'current': 4, 'total': 5, 'status': 'Querying Prometheus metrics...'} + ) + + # Get cluster overcommit data + overcommit_data = prometheus_client.get_cluster_overcommit() + + # Step 5: Generate recommendations + self.update_state( + state='PROGRESS', + meta={'current': 5, 'total': 5, 'status': 'Generating recommendations...'} + ) + + # Prepare results + results = { + 'cluster_info': { + 'total_namespaces': len(namespaces), + 'total_pods': len(pods), + 'total_nodes': len(nodes), + }, + 'validations': validations, + 'overcommit': overcommit_data, + 'summary': { + 'total_errors': len([v for v in validations if v.get('severity') == 'error']), + 'total_warnings': len([v for v in validations if v.get('severity') == 'warning']), + 'total_info': len([v for v in validations if v.get('severity') == 'info']), + } + } + + logger.info(f"Cluster analysis completed successfully. Found {results['summary']['total_errors']} errors, {results['summary']['total_warnings']} warnings") + + return results + + except Exception as exc: + logger.error(f"Cluster analysis failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': 'Analysis failed'} + ) + raise exc + +@celery_app.task(name='app.tasks.cluster_analysis.health_check') +def health_check(): + """ + Health check task for monitoring. + + Returns: + dict: Health status + """ + try: + k8s_client = K8sClient() + # Simple health check - try to get namespaces + namespaces = k8s_client.get_namespaces() + + return { + 'status': 'healthy', + 'namespaces_count': len(namespaces), + 'timestamp': '2024-01-04T10:00:00Z' + } + except Exception as exc: + logger.error(f"Health check failed: {str(exc)}") + return { + 'status': 'unhealthy', + 'error': str(exc), + 'timestamp': '2024-01-04T10:00:00Z' + } + +@celery_app.task(bind=True, name='app.tasks.cluster_analysis.analyze_namespace') +def analyze_namespace(self, namespace): + """ + Analyze specific namespace resources. + + Args: + namespace: Namespace name + + Returns: + dict: Namespace analysis results + """ + try: + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 3, 'status': f'Analyzing namespace {namespace}...'} + ) + + k8s_client = K8sClient() + validation_service = ValidationService() + + # Get namespace pods + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 3, 'status': f'Getting pods in namespace {namespace}...'} + ) + + pods = k8s_client.get_pods(namespace=namespace) + + # Validate resources + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 3, 'status': f'Validating resources in namespace {namespace}...'} + ) + + validations = validation_service.validate_cluster_resources(pods) + + # Prepare results + results = { + 'namespace': namespace, + 'pods_count': len(pods), + 'validations': validations, + 'summary': { + 'total_errors': len([v for v in validations if v.get('severity') == 'error']), + 'total_warnings': len([v for v in validations if v.get('severity') == 'warning']), + } + } + + logger.info(f"Namespace {namespace} analysis completed. Found {results['summary']['total_errors']} errors, {results['summary']['total_warnings']} warnings") + + return results + + except Exception as exc: + logger.error(f"Namespace {namespace} analysis failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': f'Namespace {namespace} analysis failed'} + ) + raise exc diff --git a/app/tasks/prometheus_queries.py b/app/tasks/prometheus_queries.py new file mode 100644 index 0000000..6e05d4e --- /dev/null +++ b/app/tasks/prometheus_queries.py @@ -0,0 +1,218 @@ +""" +Celery tasks for Prometheus queries. +""" +from celery import current_task +from app.celery_app import celery_app +from app.core.prometheus_client import PrometheusClient +from app.services.historical_analysis import HistoricalAnalysisService +import logging + +logger = logging.getLogger(__name__) + +@celery_app.task(bind=True, name='app.tasks.prometheus_queries.query_historical_data') +def query_historical_data(self, namespace, workload, time_range='24h'): + """ + Query historical data for a specific workload. + + Args: + namespace: Namespace name + workload: Workload name + time_range: Time range for analysis + + Returns: + dict: Historical analysis results + """ + try: + # Update task state + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 4, 'status': f'Starting historical analysis for {namespace}/{workload}...'} + ) + + prometheus_client = PrometheusClient() + historical_service = HistoricalAnalysisService() + + # Step 1: Query CPU metrics + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 4, 'status': f'Querying CPU metrics for {namespace}/{workload}...'} + ) + + cpu_data = historical_service.get_workload_cpu_metrics(namespace, workload, time_range) + + # Step 2: Query Memory metrics + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 4, 'status': f'Querying Memory metrics for {namespace}/{workload}...'} + ) + + memory_data = historical_service.get_workload_memory_metrics(namespace, workload, time_range) + + # Step 3: Analyze patterns + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 4, 'status': f'Analyzing usage patterns for {namespace}/{workload}...'} + ) + + analysis = historical_service.analyze_workload_patterns(cpu_data, memory_data) + + # Step 4: Generate recommendations + self.update_state( + state='PROGRESS', + meta={'current': 4, 'total': 4, 'status': f'Generating recommendations for {namespace}/{workload}...'} + ) + + recommendations = historical_service.generate_recommendations(analysis) + + results = { + 'namespace': namespace, + 'workload': workload, + 'time_range': time_range, + 'cpu_data': cpu_data, + 'memory_data': memory_data, + 'analysis': analysis, + 'recommendations': recommendations + } + + logger.info(f"Historical analysis completed for {namespace}/{workload}") + + return results + + except Exception as exc: + logger.error(f"Historical analysis failed for {namespace}/{workload}: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': f'Historical analysis failed for {namespace}/{workload}'} + ) + raise exc + +@celery_app.task(bind=True, name='app.tasks.prometheus_queries.query_cluster_metrics') +def query_cluster_metrics(self): + """ + Query cluster-wide metrics from Prometheus. + + Returns: + dict: Cluster metrics + """ + try: + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 3, 'status': 'Querying cluster metrics...'} + ) + + prometheus_client = PrometheusClient() + + # Step 1: Query CPU metrics + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 3, 'status': 'Querying CPU cluster metrics...'} + ) + + cpu_metrics = prometheus_client.query_cluster_cpu_metrics() + + # Step 2: Query Memory metrics + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 3, 'status': 'Querying Memory cluster metrics...'} + ) + + memory_metrics = prometheus_client.query_cluster_memory_metrics() + + # Step 3: Query overcommit data + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 3, 'status': 'Querying overcommit metrics...'} + ) + + overcommit_data = prometheus_client.get_cluster_overcommit() + + results = { + 'cpu_metrics': cpu_metrics, + 'memory_metrics': memory_metrics, + 'overcommit': overcommit_data, + 'timestamp': '2024-01-04T10:00:00Z' + } + + logger.info("Cluster metrics query completed successfully") + + return results + + except Exception as exc: + logger.error(f"Cluster metrics query failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': 'Cluster metrics query failed'} + ) + raise exc + +@celery_app.task(bind=True, name='app.tasks.prometheus_queries.batch_query_workloads') +def batch_query_workloads(self, workloads): + """ + Batch query multiple workloads for efficiency. + + Args: + workloads: List of workload dicts with namespace and workload name + + Returns: + dict: Batch query results + """ + try: + total_workloads = len(workloads) + + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': total_workloads, 'status': f'Starting batch query for {total_workloads} workloads...'} + ) + + prometheus_client = PrometheusClient() + historical_service = HistoricalAnalysisService() + + results = [] + + for i, workload in enumerate(workloads): + namespace = workload['namespace'] + workload_name = workload['workload'] + + self.update_state( + state='PROGRESS', + meta={'current': i + 1, 'total': total_workloads, 'status': f'Querying {namespace}/{workload_name}...'} + ) + + try: + # Query workload metrics + cpu_data = historical_service.get_workload_cpu_metrics(namespace, workload_name, '24h') + memory_data = historical_service.get_workload_memory_metrics(namespace, workload_name, '24h') + + results.append({ + 'namespace': namespace, + 'workload': workload_name, + 'cpu_data': cpu_data, + 'memory_data': memory_data, + 'status': 'success' + }) + + except Exception as exc: + logger.warning(f"Failed to query {namespace}/{workload_name}: {str(exc)}") + results.append({ + 'namespace': namespace, + 'workload': workload_name, + 'error': str(exc), + 'status': 'failed' + }) + + logger.info(f"Batch query completed for {total_workloads} workloads") + + return { + 'total_workloads': total_workloads, + 'successful': len([r for r in results if r['status'] == 'success']), + 'failed': len([r for r in results if r['status'] == 'failed']), + 'results': results + } + + except Exception as exc: + logger.error(f"Batch query failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': 'Batch query failed'} + ) + raise exc diff --git a/app/tasks/recommendations.py b/app/tasks/recommendations.py new file mode 100644 index 0000000..4dd2915 --- /dev/null +++ b/app/tasks/recommendations.py @@ -0,0 +1,260 @@ +""" +Celery tasks for generating recommendations. +""" +from celery import current_task +from app.celery_app import celery_app +from app.services.validation_service import ValidationService +from app.services.historical_analysis import HistoricalAnalysisService +import logging + +logger = logging.getLogger(__name__) + +@celery_app.task(bind=True, name='app.tasks.recommendations.generate_smart_recommendations') +def generate_smart_recommendations(self, cluster_data): + """ + Generate smart recommendations based on cluster analysis. + + Args: + cluster_data: Cluster analysis data + + Returns: + dict: Smart recommendations + """ + try: + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 4, 'status': 'Starting smart recommendations generation...'} + ) + + validation_service = ValidationService() + historical_service = HistoricalAnalysisService() + + # Step 1: Analyze resource configurations + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 4, 'status': 'Analyzing resource configurations...'} + ) + + resource_recommendations = validation_service.generate_resource_recommendations(cluster_data.get('validations', [])) + + # Step 2: Analyze historical patterns + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 4, 'status': 'Analyzing historical patterns...'} + ) + + historical_recommendations = historical_service.generate_historical_recommendations(cluster_data) + + # Step 3: Generate VPA recommendations + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 4, 'status': 'Generating VPA recommendations...'} + ) + + vpa_recommendations = validation_service.generate_vpa_recommendations(cluster_data) + + # Step 4: Prioritize recommendations + self.update_state( + state='PROGRESS', + meta={'current': 4, 'total': 4, 'status': 'Prioritizing recommendations...'} + ) + + all_recommendations = resource_recommendations + historical_recommendations + vpa_recommendations + + # Sort by priority + priority_order = {'critical': 1, 'high': 2, 'medium': 3, 'low': 4} + all_recommendations.sort(key=lambda x: priority_order.get(x.get('priority', 'low'), 4)) + + results = { + 'total_recommendations': len(all_recommendations), + 'by_priority': { + 'critical': len([r for r in all_recommendations if r.get('priority') == 'critical']), + 'high': len([r for r in all_recommendations if r.get('priority') == 'high']), + 'medium': len([r for r in all_recommendations if r.get('priority') == 'medium']), + 'low': len([r for r in all_recommendations if r.get('priority') == 'low']), + }, + 'recommendations': all_recommendations, + 'summary': { + 'resource_config': len(resource_recommendations), + 'historical_analysis': len(historical_recommendations), + 'vpa_activation': len(vpa_recommendations), + } + } + + logger.info(f"Generated {len(all_recommendations)} smart recommendations") + + return results + + except Exception as exc: + logger.error(f"Smart recommendations generation failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': 'Smart recommendations generation failed'} + ) + raise exc + +@celery_app.task(bind=True, name='app.tasks.recommendations.generate_namespace_recommendations') +def generate_namespace_recommendations(self, namespace, namespace_data): + """ + Generate recommendations for a specific namespace. + + Args: + namespace: Namespace name + namespace_data: Namespace analysis data + + Returns: + dict: Namespace recommendations + """ + try: + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 3, 'status': f'Generating recommendations for namespace {namespace}...'} + ) + + validation_service = ValidationService() + + # Step 1: Analyze namespace validations + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 3, 'status': f'Analyzing validations for namespace {namespace}...'} + ) + + validations = namespace_data.get('validations', []) + resource_recommendations = validation_service.generate_resource_recommendations(validations) + + # Step 2: Generate namespace-specific recommendations + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 3, 'status': f'Generating namespace-specific recommendations for {namespace}...'} + ) + + namespace_recommendations = validation_service.generate_namespace_recommendations(namespace, namespace_data) + + # Step 3: Prioritize and format recommendations + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 3, 'status': f'Prioritizing recommendations for namespace {namespace}...'} + ) + + all_recommendations = resource_recommendations + namespace_recommendations + + # Add namespace context to recommendations + for rec in all_recommendations: + rec['namespace'] = namespace + rec['context'] = f"Namespace: {namespace}" + + results = { + 'namespace': namespace, + 'total_recommendations': len(all_recommendations), + 'recommendations': all_recommendations, + 'summary': { + 'errors': len([v for v in validations if v.get('severity') == 'error']), + 'warnings': len([v for v in validations if v.get('severity') == 'warning']), + 'pods_analyzed': namespace_data.get('pods_count', 0), + } + } + + logger.info(f"Generated {len(all_recommendations)} recommendations for namespace {namespace}") + + return results + + except Exception as exc: + logger.error(f"Namespace recommendations generation failed for {namespace}: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': f'Namespace recommendations generation failed for {namespace}'} + ) + raise exc + +@celery_app.task(bind=True, name='app.tasks.recommendations.generate_export_report') +def generate_export_report(self, cluster_data, format='json'): + """ + Generate export report in specified format. + + Args: + cluster_data: Cluster analysis data + format: Export format (json, csv, pdf) + + Returns: + dict: Export report data + """ + try: + self.update_state( + state='PROGRESS', + meta={'current': 0, 'total': 3, 'status': f'Generating {format.upper()} export report...'} + ) + + # Step 1: Prepare data + self.update_state( + state='PROGRESS', + meta={'current': 1, 'total': 3, 'status': 'Preparing export data...'} + ) + + export_data = { + 'timestamp': '2024-01-04T10:00:00Z', + 'cluster_info': cluster_data.get('cluster_info', {}), + 'validations': cluster_data.get('validations', []), + 'overcommit': cluster_data.get('overcommit', {}), + 'summary': cluster_data.get('summary', {}), + } + + # Step 2: Generate recommendations + self.update_state( + state='PROGRESS', + meta={'current': 2, 'total': 3, 'status': 'Generating recommendations for export...'} + ) + + recommendations_task = generate_smart_recommendations.delay(cluster_data) + recommendations = recommendations_task.get() + + export_data['recommendations'] = recommendations.get('recommendations', []) + + # Step 3: Format export + self.update_state( + state='PROGRESS', + meta={'current': 3, 'total': 3, 'status': f'Formatting {format.upper()} export...'} + ) + + if format == 'csv': + # Convert to CSV format + csv_data = convert_to_csv(export_data) + export_data['csv_data'] = csv_data + elif format == 'pdf': + # Convert to PDF format + pdf_data = convert_to_pdf(export_data) + export_data['pdf_data'] = pdf_data + + results = { + 'format': format, + 'data': export_data, + 'size': len(str(export_data)), + 'timestamp': '2024-01-04T10:00:00Z' + } + + logger.info(f"Generated {format.upper()} export report successfully") + + return results + + except Exception as exc: + logger.error(f"Export report generation failed: {str(exc)}") + self.update_state( + state='FAILURE', + meta={'error': str(exc), 'status': f'Export report generation failed'} + ) + raise exc + +def convert_to_csv(data): + """Convert data to CSV format.""" + # Simple CSV conversion - in real implementation, use pandas or csv module + return "namespace,workload,severity,message,recommendation\n" + \ + "\n".join([f"{v.get('namespace', '')},{v.get('workload', '')},{v.get('severity', '')},{v.get('message', '')},{v.get('recommendation', '')}" + for v in data.get('validations', [])]) + +def convert_to_pdf(data): + """Convert data to PDF format.""" + # Simple PDF conversion - in real implementation, use reportlab + return f"PDF Report for Cluster Analysis\n\n" + \ + f"Total Namespaces: {data.get('cluster_info', {}).get('total_namespaces', 0)}\n" + \ + f"Total Pods: {data.get('cluster_info', {}).get('total_pods', 0)}\n" + \ + f"Total Errors: {data.get('summary', {}).get('total_errors', 0)}\n" + \ + f"Total Warnings: {data.get('summary', {}).get('total_warnings', 0)}\n" diff --git a/app/workers/celery_beat.py b/app/workers/celery_beat.py new file mode 100644 index 0000000..a0f93e0 --- /dev/null +++ b/app/workers/celery_beat.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +""" +Celery beat scheduler startup script. +""" +import os +import sys +from celery import Celery + +# Add the app directory to Python path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from app.celery_app import celery_app + +if __name__ == '__main__': + # Start Celery beat scheduler + celery_app.start([ + 'beat', + '--loglevel=info', + '--scheduler=celery.beat:PersistentScheduler' + ]) diff --git a/app/workers/celery_worker.py b/app/workers/celery_worker.py new file mode 100644 index 0000000..9c9dc93 --- /dev/null +++ b/app/workers/celery_worker.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +Celery worker startup script. +""" +import os +import sys +from celery import Celery + +# Add the app directory to Python path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from app.celery_app import celery_app + +if __name__ == '__main__': + # Start Celery worker + celery_app.worker_main([ + 'worker', + '--loglevel=info', + '--concurrency=4', + '--queues=cluster_analysis,prometheus,recommendations', + '--hostname=worker@%h' + ]) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..329d22b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,86 @@ +version: '3.8' + +services: + # Redis - Message broker for Celery + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + + # FastAPI Application + web: + build: + context: . + dockerfile: Dockerfile.celery + ports: + - "8080:8080" + environment: + - REDIS_URL=redis://redis:6379/0 + - KUBECONFIG=/tmp/kubeconfig + volumes: + - ./kubeconfig:/tmp/kubeconfig:ro + depends_on: + redis: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + + # Celery Worker + worker: + build: + context: . + dockerfile: Dockerfile.celery + command: python app/workers/celery_worker.py + environment: + - REDIS_URL=redis://redis:6379/0 + - KUBECONFIG=/tmp/kubeconfig + volumes: + - ./kubeconfig:/tmp/kubeconfig:ro + depends_on: + redis: + condition: service_healthy + deploy: + replicas: 2 + + # Celery Beat Scheduler + beat: + build: + context: . + dockerfile: Dockerfile.celery + command: python app/workers/celery_beat.py + environment: + - REDIS_URL=redis://redis:6379/0 + - KUBECONFIG=/tmp/kubeconfig + volumes: + - ./kubeconfig:/tmp/kubeconfig:ro + depends_on: + redis: + condition: service_healthy + + # Flower - Celery Monitoring + flower: + build: + context: . + dockerfile: Dockerfile.celery + command: celery -A app.celery_app flower --port=5555 + ports: + - "5555:5555" + environment: + - REDIS_URL=redis://redis:6379/0 + depends_on: + redis: + condition: service_healthy + +volumes: + redis_data: diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml index c97bb5d..9209692 100644 --- a/k8s/deployment.yaml +++ b/k8s/deployment.yaml @@ -113,6 +113,21 @@ spec: configMapKeyRef: name: resource-governance-config key: SERVICE_ACCOUNT_NAME + - name: REDIS_URL + valueFrom: + configMapKeyRef: + name: redis-config + key: REDIS_URL + - name: CELERY_BROKER_URL + valueFrom: + configMapKeyRef: + name: redis-config + key: CELERY_BROKER_URL + - name: CELERY_RESULT_BACKEND + valueFrom: + configMapKeyRef: + name: redis-config + key: CELERY_RESULT_BACKEND resources: requests: cpu: 100m diff --git a/k8s/kustomization.yaml b/k8s/kustomization.yaml index 8e95dc5..9af14bb 100644 --- a/k8s/kustomization.yaml +++ b/k8s/kustomization.yaml @@ -5,6 +5,8 @@ resources: - namespace.yaml - rbac.yaml - configmap.yaml +- redis-configmap.yaml +- redis-deployment.yaml - deployment.yaml - service.yaml - route.yaml diff --git a/k8s/redis-configmap.yaml b/k8s/redis-configmap.yaml new file mode 100644 index 0000000..d6e8eeb --- /dev/null +++ b/k8s/redis-configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: redis-config + namespace: resource-governance +data: + REDIS_URL: "redis://redis-service:6379/0" + CELERY_BROKER_URL: "redis://redis-service:6379/0" + CELERY_RESULT_BACKEND: "redis://redis-service:6379/0" diff --git a/k8s/redis-deployment.yaml b/k8s/redis-deployment.yaml new file mode 100644 index 0000000..9700c27 --- /dev/null +++ b/k8s/redis-deployment.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis + namespace: resource-governance + labels: + app: redis +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:7-alpine + ports: + - containerPort: 6379 + command: ["redis-server", "--appendonly", "yes"] + volumeMounts: + - name: redis-data + mountPath: /data + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 200m + memory: 128Mi + livenessProbe: + tcpSocket: + port: 6379 + initialDelaySeconds: 30 + periodSeconds: 10 + readinessProbe: + tcpSocket: + port: 6379 + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: redis-data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: redis-service + namespace: resource-governance + labels: + app: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + protocol: TCP + selector: + app: redis diff --git a/requirements.txt b/requirements.txt index 86842bb..f3cc122 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,6 @@ python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 python-dotenv==1.0.0 aiohttp==3.9.1 +celery==5.3.4 +redis==5.0.1 +flower==2.0.1 diff --git a/scripts/deploy-complete.sh b/scripts/deploy-complete.sh index dd2fe22..7bb4090 100755 --- a/scripts/deploy-complete.sh +++ b/scripts/deploy-complete.sh @@ -26,6 +26,14 @@ oc apply -f k8s/rbac.yaml echo -e "${YELLOW}Applying ConfigMap...${NC}" oc apply -f k8s/configmap.yaml +# Apply Redis ConfigMap +echo -e "${YELLOW}Applying Redis ConfigMap...${NC}" +oc apply -f k8s/redis-configmap.yaml + +# Apply Redis Deployment +echo -e "${YELLOW}Applying Redis Deployment...${NC}" +oc apply -f k8s/redis-deployment.yaml + # Create ServiceAccount token secret echo -e "${YELLOW}Creating ServiceAccount token...${NC}"