diff --git a/app/api/routes.py b/app/api/routes.py index f4f1e19..ae680a0 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -1738,6 +1738,199 @@ async def health_check(): "version": "1.0.0" } +# ============================================================================ +# BATCH PROCESSING ENDPOINTS - For Large Clusters (10,000+ pods) +# ============================================================================ + +@api_router.get("/batch/statistics") +async def get_batch_statistics( + k8s_client=Depends(get_k8s_client) +): + """Get batch processing statistics for the cluster""" + try: + from app.tasks.batch_analysis import get_batch_statistics + + # Start the task + task = get_batch_statistics.delay() + + return { + "task_id": task.id, + "status": "started", + "message": "Batch statistics calculation started", + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error starting batch statistics: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/batch/statistics/{task_id}") +async def get_batch_statistics_result(task_id: str): + """Get batch statistics result""" + try: + from app.tasks.batch_analysis import get_batch_statistics + + # Get task result + task = get_batch_statistics.AsyncResult(task_id) + + if task.state == 'PENDING': + return { + "task_id": task_id, + "status": "pending", + "message": "Task is still processing..." + } + elif task.state == 'PROGRESS': + return { + "task_id": task_id, + "status": "progress", + "message": "Task is in progress...", + "meta": task.info + } + elif task.state == 'SUCCESS': + return { + "task_id": task_id, + "status": "success", + "result": task.result + } + else: + return { + "task_id": task_id, + "status": "error", + "error": str(task.info) + } + + except Exception as e: + logger.error(f"Error getting batch statistics result: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.post("/batch/process") +async def start_batch_processing( + namespace: Optional[str] = None, + include_system_namespaces: bool = False, + batch_size: int = 100 +): + """Start batch processing for large clusters""" + try: + from app.tasks.batch_analysis import process_cluster_batch + + # Validate batch size + if batch_size < 10 or batch_size > 500: + raise HTTPException( + status_code=400, + detail="Batch size must be between 10 and 500" + ) + + # Start the task + task = process_cluster_batch.delay({ + 'namespace': namespace, + 'include_system_namespaces': include_system_namespaces, + 'batch_size': batch_size + }) + + return { + "task_id": task.id, + "status": "started", + "message": f"Batch processing started with batch size {batch_size}", + "namespace": namespace, + "include_system_namespaces": include_system_namespaces, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error starting batch processing: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/batch/process/{task_id}") +async def get_batch_processing_result(task_id: str): + """Get batch processing result""" + try: + from app.tasks.batch_analysis import process_cluster_batch + + # Get task result + task = process_cluster_batch.AsyncResult(task_id) + + if task.state == 'PENDING': + return { + "task_id": task_id, + "status": "pending", + "message": "Task is still processing..." + } + elif task.state == 'PROGRESS': + return { + "task_id": task_id, + "status": "progress", + "message": "Task is in progress...", + "meta": task.info + } + elif task.state == 'SUCCESS': + return { + "task_id": task_id, + "status": "success", + "result": task.result + } + else: + return { + "task_id": task_id, + "status": "error", + "error": str(task.info) + } + + except Exception as e: + logger.error(f"Error getting batch processing result: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@api_router.get("/batch/validations") +async def get_batch_validations( + namespace: Optional[str] = None, + severity: Optional[str] = None, + page: int = 1, + page_size: int = 50, + include_system_namespaces: bool = False, + k8s_client=Depends(get_k8s_client) +): + """Get validations using batch processing for large clusters""" + try: + from app.services.batch_processing import batch_processing_service + + # Get all validations using batch processing + all_validations = [] + + async for batch_result in batch_processing_service.process_cluster_in_batches( + k8s_client, + namespace=namespace, + include_system_namespaces=include_system_namespaces + ): + all_validations.extend(batch_result.validations) + + # Filter by severity if specified + if severity: + all_validations = [ + v for v in all_validations if v.get('severity') == severity + ] + + # Pagination + total = len(all_validations) + start = (page - 1) * page_size + end = start + page_size + paginated_validations = all_validations[start:end] + + return { + "validations": paginated_validations, + "pagination": { + "page": page, + "page_size": page_size, + "total": total, + "total_pages": (total + page_size - 1) // page_size + }, + "processing_method": "batch", + "batch_size": batch_processing_service.batch_size, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting batch validations: {e}") + raise HTTPException(status_code=500, detail=str(e)) + # ============================================================================ # OPTIMIZED ENDPOINTS - 10x Performance Improvement # ============================================================================ diff --git a/app/core/config.py b/app/core/config.py index 6c3cb01..7b14ed2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -57,6 +57,11 @@ class Settings(BaseSettings): enable_rbac: bool = True service_account_name: str = "resource-governance-sa" + # Batch processing settings + batch_size: int = Field(default=100, alias="BATCH_SIZE") + max_batch_size: int = Field(default=500, alias="MAX_BATCH_SIZE") + min_batch_size: int = Field(default=10, alias="MIN_BATCH_SIZE") + class Config: env_file = ".env" case_sensitive = False diff --git a/app/services/batch_processing.py b/app/services/batch_processing.py new file mode 100644 index 0000000..199cb92 --- /dev/null +++ b/app/services/batch_processing.py @@ -0,0 +1,284 @@ +""" +Batch Processing Service for Large Clusters + +This service implements intelligent batch processing to handle large clusters +efficiently by processing pods in batches of 100, reducing memory usage and +improving performance for clusters with 10,000+ pods. +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional, AsyncGenerator, Tuple +from dataclasses import dataclass +from datetime import datetime +import gc + +from app.core.kubernetes_client import K8sClient, PodResource +from app.services.validation_service import ValidationService +from app.services.smart_recommendations import SmartRecommendationsService +from app.services.historical_analysis import HistoricalAnalysisService + +logger = logging.getLogger(__name__) + +@dataclass +class BatchResult: + """Result of a batch processing operation""" + batch_number: int + total_batches: int + pods_processed: int + validations: List[Dict[str, Any]] + recommendations: List[Dict[str, Any]] + processing_time: float + memory_usage: float + errors: List[str] + +@dataclass +class BatchProgress: + """Progress tracking for batch processing""" + current_batch: int + total_batches: int + pods_processed: int + total_pods: int + validations_found: int + recommendations_generated: int + processing_time: float + estimated_completion: Optional[datetime] + status: str # 'running', 'completed', 'error', 'paused' + +class BatchProcessingService: + """Service for processing large clusters in batches""" + + def __init__(self, batch_size: int = 100): + self.batch_size = batch_size + self.validation_service = ValidationService() + self.smart_recommendations_service = SmartRecommendationsService() + self.historical_service = HistoricalAnalysisService() + + async def process_cluster_in_batches( + self, + k8s_client: K8sClient, + namespace: Optional[str] = None, + include_system_namespaces: bool = False, + progress_callback: Optional[callable] = None + ) -> AsyncGenerator[BatchResult, None]: + """ + Process cluster pods in batches with progress tracking + + Args: + k8s_client: Kubernetes client instance + namespace: Optional namespace filter + include_system_namespaces: Whether to include system namespaces + progress_callback: Optional callback for progress updates + + Yields: + BatchResult: Results for each batch processed + """ + try: + # Get all pods + if namespace: + namespace_resources = await k8s_client.get_namespace_resources(namespace) + all_pods = namespace_resources.pods + else: + all_pods = await k8s_client.get_all_pods(include_system_namespaces=include_system_namespaces) + + total_pods = len(all_pods) + total_batches = (total_pods + self.batch_size - 1) // self.batch_size + + logger.info(f"Starting batch processing: {total_pods} pods in {total_batches} batches of {self.batch_size}") + + # Process pods in batches + for batch_num in range(total_batches): + start_idx = batch_num * self.batch_size + end_idx = min(start_idx + self.batch_size, total_pods) + batch_pods = all_pods[start_idx:end_idx] + + # Process this batch + batch_result = await self._process_batch( + batch_num + 1, + total_batches, + batch_pods, + start_idx, + total_pods + ) + + # Update progress + if progress_callback: + progress = BatchProgress( + current_batch=batch_num + 1, + total_batches=total_batches, + pods_processed=end_idx, + total_pods=total_pods, + validations_found=sum(len(r.validations) for r in batch_result), + recommendations_generated=sum(len(r.recommendations) for r in batch_result), + processing_time=batch_result.processing_time, + estimated_completion=None, # Could calculate based on avg time + status='running' + ) + progress_callback(progress) + + yield batch_result + + # Memory cleanup after each batch + await self._cleanup_memory() + + # Small delay to prevent overwhelming the system + await asyncio.sleep(0.1) + + except Exception as e: + logger.error(f"Error in batch processing: {e}", exc_info=True) + raise + + async def _process_batch( + self, + batch_number: int, + total_batches: int, + pods: List[PodResource], + start_idx: int, + total_pods: int + ) -> BatchResult: + """Process a single batch of pods""" + start_time = datetime.now() + errors = [] + validations = [] + recommendations = [] + + try: + logger.info(f"Processing batch {batch_number}/{total_batches}: {len(pods)} pods") + + # Process validations for this batch + for pod in pods: + try: + pod_validations = self.validation_service.validate_pod_resources(pod) + for validation in pod_validations: + validations.append({ + 'pod_name': validation.pod_name, + 'namespace': validation.namespace, + 'container_name': validation.container_name, + 'validation_type': validation.validation_type, + 'severity': validation.severity, + 'message': validation.message, + 'recommendation': validation.recommendation, + 'priority_score': validation.priority_score, + 'workload_category': validation.workload_category, + 'estimated_impact': validation.estimated_impact + }) + except Exception as e: + error_msg = f"Error validating pod {pod.name}: {str(e)}" + logger.warning(error_msg) + errors.append(error_msg) + + # Generate smart recommendations for this batch + try: + batch_recommendations = await self.smart_recommendations_service.generate_smart_recommendations(pods, []) + for rec in batch_recommendations: + recommendations.append({ + 'workload_name': rec.workload_name, + 'namespace': rec.namespace, + 'recommendation_type': rec.recommendation_type, + 'priority_score': rec.priority_score, + 'title': rec.title, + 'description': rec.description, + 'estimated_impact': rec.estimated_impact, + 'implementation_effort': rec.implementation_effort + }) + except Exception as e: + error_msg = f"Error generating recommendations for batch {batch_number}: {str(e)}" + logger.warning(error_msg) + errors.append(error_msg) + + processing_time = (datetime.now() - start_time).total_seconds() + + return BatchResult( + batch_number=batch_number, + total_batches=total_batches, + pods_processed=len(pods), + validations=validations, + recommendations=recommendations, + processing_time=processing_time, + memory_usage=self._get_memory_usage(), + errors=errors + ) + + except Exception as e: + processing_time = (datetime.now() - start_time).total_seconds() + error_msg = f"Error processing batch {batch_number}: {str(e)}" + logger.error(error_msg, exc_info=True) + + return BatchResult( + batch_number=batch_number, + total_batches=total_batches, + pods_processed=len(pods), + validations=[], + recommendations=[], + processing_time=processing_time, + memory_usage=self._get_memory_usage(), + errors=[error_msg] + ) + + async def _cleanup_memory(self): + """Clean up memory after each batch""" + try: + # Force garbage collection + gc.collect() + + # Small delay to allow memory cleanup + await asyncio.sleep(0.01) + + except Exception as e: + logger.warning(f"Error during memory cleanup: {e}") + + def _get_memory_usage(self) -> float: + """Get current memory usage in MB""" + try: + import psutil + process = psutil.Process() + return process.memory_info().rss / 1024 / 1024 # Convert to MB + except ImportError: + return 0.0 + except Exception: + return 0.0 + + async def get_batch_statistics(self, k8s_client: K8sClient) -> Dict[str, Any]: + """Get statistics about batch processing for the cluster""" + try: + all_pods = await k8s_client.get_all_pods(include_system_namespaces=False) + total_pods = len(all_pods) + total_batches = (total_pods + self.batch_size - 1) // self.batch_size + + # Group by namespace + namespace_counts = {} + for pod in all_pods: + namespace_counts[pod.namespace] = namespace_counts.get(pod.namespace, 0) + 1 + + return { + 'total_pods': total_pods, + 'total_namespaces': len(namespace_counts), + 'batch_size': self.batch_size, + 'total_batches': total_batches, + 'estimated_processing_time': total_batches * 2.0, # 2 seconds per batch estimate + 'namespace_distribution': namespace_counts, + 'memory_efficiency': 'High' if total_batches > 10 else 'Standard', + 'recommended_batch_size': self._recommend_batch_size(total_pods) + } + + except Exception as e: + logger.error(f"Error getting batch statistics: {e}", exc_info=True) + return { + 'error': str(e), + 'total_pods': 0, + 'total_batches': 0 + } + + def _recommend_batch_size(self, total_pods: int) -> int: + """Recommend optimal batch size based on cluster size""" + if total_pods < 1000: + return 50 + elif total_pods < 5000: + return 100 + elif total_pods < 10000: + return 150 + else: + return 200 + +# Global instance +batch_processing_service = BatchProcessingService() diff --git a/app/tasks/batch_analysis.py b/app/tasks/batch_analysis.py new file mode 100644 index 0000000..d58c843 --- /dev/null +++ b/app/tasks/batch_analysis.py @@ -0,0 +1,226 @@ +""" +Celery tasks for batch processing of large clusters +""" + +import asyncio +import logging +from typing import Dict, Any, List +from datetime import datetime +import os + +from app.celery_app import celery_app +from app.services.batch_processing import batch_processing_service, BatchProgress +from app.core.kubernetes_client import K8sClient + +logger = logging.getLogger(__name__) + +@celery_app.task(bind=True, name='app.tasks.batch_analysis.process_cluster_batch') +def process_cluster_batch(self, cluster_config: Dict[str, Any] = None): + """ + Process cluster analysis in batches for large clusters + + Args: + cluster_config: Cluster configuration dict + + Returns: + dict: Batch processing results + """ + try: + # Update task state + self.update_state( + state='PROGRESS', + meta={ + 'current': 0, + 'total': 1, + 'status': 'Starting batch processing...', + 'batch_number': 0, + 'total_batches': 0, + 'pods_processed': 0, + 'total_pods': 0 + } + ) + + # Initialize clients + k8s_client = K8sClient() + + # Run async processing + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete(_process_cluster_async(self, k8s_client, cluster_config)) + return result + finally: + loop.close() + + except Exception as exc: + logger.error(f"Batch processing failed: {str(exc)}", exc_info=True) + return { + 'error': str(exc), + 'status': 'failed', + 'timestamp': datetime.now().isoformat() + } + +async def _process_cluster_async(task, k8s_client: K8sClient, cluster_config: Dict[str, Any]): + """Async processing function""" + try: + # Initialize K8s client + await k8s_client.initialize() + + # Get batch statistics + batch_stats = await batch_processing_service.get_batch_statistics(k8s_client) + + # Update task with statistics + task.update_state( + state='PROGRESS', + meta={ + 'current': 1, + 'total': batch_stats.get('total_batches', 1), + 'status': f"Processing {batch_stats.get('total_pods', 0)} pods in {batch_stats.get('total_batches', 0)} batches...", + 'batch_number': 0, + 'total_batches': batch_stats.get('total_batches', 0), + 'pods_processed': 0, + 'total_pods': batch_stats.get('total_pods', 0), + 'statistics': batch_stats + } + ) + + # Process in batches + all_validations = [] + all_recommendations = [] + total_errors = [] + total_processing_time = 0 + + batch_count = 0 + + async for batch_result in batch_processing_service.process_cluster_in_batches( + k8s_client, + namespace=cluster_config.get('namespace') if cluster_config else None, + include_system_namespaces=cluster_config.get('include_system_namespaces', False) if cluster_config else False, + progress_callback=lambda progress: _update_task_progress(task, progress) + ): + batch_count += 1 + + # Collect results + all_validations.extend(batch_result.validations) + all_recommendations.extend(batch_result.recommendations) + total_errors.extend(batch_result.errors) + total_processing_time += batch_result.processing_time + + # Update task progress + task.update_state( + state='PROGRESS', + meta={ + 'current': batch_count, + 'total': batch_result.total_batches, + 'status': f"Completed batch {batch_count}/{batch_result.total_batches} - {len(all_validations)} validations found", + 'batch_number': batch_count, + 'total_batches': batch_result.total_batches, + 'pods_processed': batch_count * batch_processing_service.batch_size, + 'total_pods': batch_stats.get('total_pods', 0), + 'validations_found': len(all_validations), + 'recommendations_generated': len(all_recommendations), + 'processing_time': total_processing_time, + 'memory_usage': batch_result.memory_usage, + 'errors': len(total_errors) + } + ) + + # Final results + results = { + 'timestamp': datetime.now().isoformat(), + 'total_pods': batch_stats.get('total_pods', 0), + 'total_batches': batch_count, + 'batch_size': batch_processing_service.batch_size, + 'total_validations': len(all_validations), + 'total_recommendations': len(all_recommendations), + 'total_errors': len(total_errors), + 'processing_time': total_processing_time, + 'statistics': batch_stats, + 'validations': all_validations, + 'recommendations': all_recommendations, + 'errors': total_errors, + 'status': 'completed' + } + + logger.info(f"Batch processing completed: {len(all_validations)} validations, {len(all_recommendations)} recommendations in {total_processing_time:.2f}s") + + return results + + except Exception as e: + logger.error(f"Error in async batch processing: {e}", exc_info=True) + raise + +def _update_task_progress(task, progress: BatchProgress): + """Update Celery task progress""" + try: + task.update_state( + state='PROGRESS', + meta={ + 'current': progress.current_batch, + 'total': progress.total_batches, + 'status': f"Processing batch {progress.current_batch}/{progress.total_batches} - {progress.pods_processed}/{progress.total_pods} pods", + 'batch_number': progress.current_batch, + 'total_batches': progress.total_batches, + 'pods_processed': progress.pods_processed, + 'total_pods': progress.total_pods, + 'validations_found': progress.validations_found, + 'recommendations_generated': progress.recommendations_generated, + 'processing_time': progress.processing_time, + 'estimated_completion': progress.estimated_completion.isoformat() if progress.estimated_completion else None + } + ) + except Exception as e: + logger.warning(f"Error updating task progress: {e}") + +@celery_app.task(bind=True, name='app.tasks.batch_analysis.get_batch_statistics') +def get_batch_statistics(self, cluster_config: Dict[str, Any] = None): + """ + Get batch processing statistics for the cluster + + Args: + cluster_config: Cluster configuration dict + + Returns: + dict: Batch statistics + """ + try: + # Initialize clients + k8s_client = K8sClient() + + # Run async processing + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete(_get_statistics_async(k8s_client, cluster_config)) + return result + finally: + loop.close() + + except Exception as exc: + logger.error(f"Error getting batch statistics: {str(exc)}", exc_info=True) + return { + 'error': str(exc), + 'status': 'failed', + 'timestamp': datetime.now().isoformat() + } + +async def _get_statistics_async(k8s_client: K8sClient, cluster_config: Dict[str, Any]): + """Async function to get batch statistics""" + try: + # Initialize K8s client + await k8s_client.initialize() + + # Get batch statistics + batch_stats = await batch_processing_service.get_batch_statistics(k8s_client) + + return { + 'timestamp': datetime.now().isoformat(), + 'statistics': batch_stats, + 'status': 'completed' + } + + except Exception as e: + logger.error(f"Error in async statistics: {e}", exc_info=True) + raise diff --git a/app/workers/celery_worker.py b/app/workers/celery_worker.py index 2816fe1..c86a78c 100644 --- a/app/workers/celery_worker.py +++ b/app/workers/celery_worker.py @@ -11,6 +11,10 @@ sys.path.insert(0, '/app') from app.celery_app import celery_app +# Import tasks to register them +from app.tasks.cluster_analysis import analyze_cluster +from app.tasks.batch_analysis import process_cluster_batch, get_batch_statistics + if __name__ == '__main__': # Start Celery worker celery_app.worker_main([ diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index 53a0d12..5892548 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -20,6 +20,11 @@ data: INCLUDE_SYSTEM_NAMESPACES: "false" SYSTEM_NAMESPACE_PREFIXES: '["kube-", "openshift-", "knative-", "default", "kube-system", "kube-public", "kube-node-lease"]' + # Configurações de batch processing + BATCH_SIZE: "100" + MAX_BATCH_SIZE: "500" + MIN_BATCH_SIZE: "10" + # URL do Prometheus PROMETHEUS_URL: "https://prometheus-k8s.openshift-monitoring.svc.cluster.local:9091" diff --git a/requirements.txt b/requirements.txt index f3cc122..610dd3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ aiohttp==3.9.1 celery==5.3.4 redis==5.0.1 flower==2.0.1 +psutil==5.9.6