feat: implement batch processing for large clusters (100 pods per batch) with memory optimization and progress tracking

This commit is contained in:
2025-10-15 16:22:40 -03:00
parent 4c6ce49526
commit 93a7a0988a
7 changed files with 718 additions and 0 deletions

View File

@@ -1738,6 +1738,199 @@ async def health_check():
"version": "1.0.0"
}
# ============================================================================
# BATCH PROCESSING ENDPOINTS - For Large Clusters (10,000+ pods)
# ============================================================================
@api_router.get("/batch/statistics")
async def get_batch_statistics(
k8s_client=Depends(get_k8s_client)
):
"""Get batch processing statistics for the cluster"""
try:
from app.tasks.batch_analysis import get_batch_statistics
# Start the task
task = get_batch_statistics.delay()
return {
"task_id": task.id,
"status": "started",
"message": "Batch statistics calculation started",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error starting batch statistics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/batch/statistics/{task_id}")
async def get_batch_statistics_result(task_id: str):
"""Get batch statistics result"""
try:
from app.tasks.batch_analysis import get_batch_statistics
# Get task result
task = get_batch_statistics.AsyncResult(task_id)
if task.state == 'PENDING':
return {
"task_id": task_id,
"status": "pending",
"message": "Task is still processing..."
}
elif task.state == 'PROGRESS':
return {
"task_id": task_id,
"status": "progress",
"message": "Task is in progress...",
"meta": task.info
}
elif task.state == 'SUCCESS':
return {
"task_id": task_id,
"status": "success",
"result": task.result
}
else:
return {
"task_id": task_id,
"status": "error",
"error": str(task.info)
}
except Exception as e:
logger.error(f"Error getting batch statistics result: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.post("/batch/process")
async def start_batch_processing(
namespace: Optional[str] = None,
include_system_namespaces: bool = False,
batch_size: int = 100
):
"""Start batch processing for large clusters"""
try:
from app.tasks.batch_analysis import process_cluster_batch
# Validate batch size
if batch_size < 10 or batch_size > 500:
raise HTTPException(
status_code=400,
detail="Batch size must be between 10 and 500"
)
# Start the task
task = process_cluster_batch.delay({
'namespace': namespace,
'include_system_namespaces': include_system_namespaces,
'batch_size': batch_size
})
return {
"task_id": task.id,
"status": "started",
"message": f"Batch processing started with batch size {batch_size}",
"namespace": namespace,
"include_system_namespaces": include_system_namespaces,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error starting batch processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/batch/process/{task_id}")
async def get_batch_processing_result(task_id: str):
"""Get batch processing result"""
try:
from app.tasks.batch_analysis import process_cluster_batch
# Get task result
task = process_cluster_batch.AsyncResult(task_id)
if task.state == 'PENDING':
return {
"task_id": task_id,
"status": "pending",
"message": "Task is still processing..."
}
elif task.state == 'PROGRESS':
return {
"task_id": task_id,
"status": "progress",
"message": "Task is in progress...",
"meta": task.info
}
elif task.state == 'SUCCESS':
return {
"task_id": task_id,
"status": "success",
"result": task.result
}
else:
return {
"task_id": task_id,
"status": "error",
"error": str(task.info)
}
except Exception as e:
logger.error(f"Error getting batch processing result: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.get("/batch/validations")
async def get_batch_validations(
namespace: Optional[str] = None,
severity: Optional[str] = None,
page: int = 1,
page_size: int = 50,
include_system_namespaces: bool = False,
k8s_client=Depends(get_k8s_client)
):
"""Get validations using batch processing for large clusters"""
try:
from app.services.batch_processing import batch_processing_service
# Get all validations using batch processing
all_validations = []
async for batch_result in batch_processing_service.process_cluster_in_batches(
k8s_client,
namespace=namespace,
include_system_namespaces=include_system_namespaces
):
all_validations.extend(batch_result.validations)
# Filter by severity if specified
if severity:
all_validations = [
v for v in all_validations if v.get('severity') == severity
]
# Pagination
total = len(all_validations)
start = (page - 1) * page_size
end = start + page_size
paginated_validations = all_validations[start:end]
return {
"validations": paginated_validations,
"pagination": {
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size
},
"processing_method": "batch",
"batch_size": batch_processing_service.batch_size,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting batch validations: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# OPTIMIZED ENDPOINTS - 10x Performance Improvement
# ============================================================================

View File

@@ -57,6 +57,11 @@ class Settings(BaseSettings):
enable_rbac: bool = True
service_account_name: str = "resource-governance-sa"
# Batch processing settings
batch_size: int = Field(default=100, alias="BATCH_SIZE")
max_batch_size: int = Field(default=500, alias="MAX_BATCH_SIZE")
min_batch_size: int = Field(default=10, alias="MIN_BATCH_SIZE")
class Config:
env_file = ".env"
case_sensitive = False

View File

@@ -0,0 +1,284 @@
"""
Batch Processing Service for Large Clusters
This service implements intelligent batch processing to handle large clusters
efficiently by processing pods in batches of 100, reducing memory usage and
improving performance for clusters with 10,000+ pods.
"""
import asyncio
import logging
from typing import List, Dict, Any, Optional, AsyncGenerator, Tuple
from dataclasses import dataclass
from datetime import datetime
import gc
from app.core.kubernetes_client import K8sClient, PodResource
from app.services.validation_service import ValidationService
from app.services.smart_recommendations import SmartRecommendationsService
from app.services.historical_analysis import HistoricalAnalysisService
logger = logging.getLogger(__name__)
@dataclass
class BatchResult:
"""Result of a batch processing operation"""
batch_number: int
total_batches: int
pods_processed: int
validations: List[Dict[str, Any]]
recommendations: List[Dict[str, Any]]
processing_time: float
memory_usage: float
errors: List[str]
@dataclass
class BatchProgress:
"""Progress tracking for batch processing"""
current_batch: int
total_batches: int
pods_processed: int
total_pods: int
validations_found: int
recommendations_generated: int
processing_time: float
estimated_completion: Optional[datetime]
status: str # 'running', 'completed', 'error', 'paused'
class BatchProcessingService:
"""Service for processing large clusters in batches"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self.validation_service = ValidationService()
self.smart_recommendations_service = SmartRecommendationsService()
self.historical_service = HistoricalAnalysisService()
async def process_cluster_in_batches(
self,
k8s_client: K8sClient,
namespace: Optional[str] = None,
include_system_namespaces: bool = False,
progress_callback: Optional[callable] = None
) -> AsyncGenerator[BatchResult, None]:
"""
Process cluster pods in batches with progress tracking
Args:
k8s_client: Kubernetes client instance
namespace: Optional namespace filter
include_system_namespaces: Whether to include system namespaces
progress_callback: Optional callback for progress updates
Yields:
BatchResult: Results for each batch processed
"""
try:
# Get all pods
if namespace:
namespace_resources = await k8s_client.get_namespace_resources(namespace)
all_pods = namespace_resources.pods
else:
all_pods = await k8s_client.get_all_pods(include_system_namespaces=include_system_namespaces)
total_pods = len(all_pods)
total_batches = (total_pods + self.batch_size - 1) // self.batch_size
logger.info(f"Starting batch processing: {total_pods} pods in {total_batches} batches of {self.batch_size}")
# Process pods in batches
for batch_num in range(total_batches):
start_idx = batch_num * self.batch_size
end_idx = min(start_idx + self.batch_size, total_pods)
batch_pods = all_pods[start_idx:end_idx]
# Process this batch
batch_result = await self._process_batch(
batch_num + 1,
total_batches,
batch_pods,
start_idx,
total_pods
)
# Update progress
if progress_callback:
progress = BatchProgress(
current_batch=batch_num + 1,
total_batches=total_batches,
pods_processed=end_idx,
total_pods=total_pods,
validations_found=sum(len(r.validations) for r in batch_result),
recommendations_generated=sum(len(r.recommendations) for r in batch_result),
processing_time=batch_result.processing_time,
estimated_completion=None, # Could calculate based on avg time
status='running'
)
progress_callback(progress)
yield batch_result
# Memory cleanup after each batch
await self._cleanup_memory()
# Small delay to prevent overwhelming the system
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error in batch processing: {e}", exc_info=True)
raise
async def _process_batch(
self,
batch_number: int,
total_batches: int,
pods: List[PodResource],
start_idx: int,
total_pods: int
) -> BatchResult:
"""Process a single batch of pods"""
start_time = datetime.now()
errors = []
validations = []
recommendations = []
try:
logger.info(f"Processing batch {batch_number}/{total_batches}: {len(pods)} pods")
# Process validations for this batch
for pod in pods:
try:
pod_validations = self.validation_service.validate_pod_resources(pod)
for validation in pod_validations:
validations.append({
'pod_name': validation.pod_name,
'namespace': validation.namespace,
'container_name': validation.container_name,
'validation_type': validation.validation_type,
'severity': validation.severity,
'message': validation.message,
'recommendation': validation.recommendation,
'priority_score': validation.priority_score,
'workload_category': validation.workload_category,
'estimated_impact': validation.estimated_impact
})
except Exception as e:
error_msg = f"Error validating pod {pod.name}: {str(e)}"
logger.warning(error_msg)
errors.append(error_msg)
# Generate smart recommendations for this batch
try:
batch_recommendations = await self.smart_recommendations_service.generate_smart_recommendations(pods, [])
for rec in batch_recommendations:
recommendations.append({
'workload_name': rec.workload_name,
'namespace': rec.namespace,
'recommendation_type': rec.recommendation_type,
'priority_score': rec.priority_score,
'title': rec.title,
'description': rec.description,
'estimated_impact': rec.estimated_impact,
'implementation_effort': rec.implementation_effort
})
except Exception as e:
error_msg = f"Error generating recommendations for batch {batch_number}: {str(e)}"
logger.warning(error_msg)
errors.append(error_msg)
processing_time = (datetime.now() - start_time).total_seconds()
return BatchResult(
batch_number=batch_number,
total_batches=total_batches,
pods_processed=len(pods),
validations=validations,
recommendations=recommendations,
processing_time=processing_time,
memory_usage=self._get_memory_usage(),
errors=errors
)
except Exception as e:
processing_time = (datetime.now() - start_time).total_seconds()
error_msg = f"Error processing batch {batch_number}: {str(e)}"
logger.error(error_msg, exc_info=True)
return BatchResult(
batch_number=batch_number,
total_batches=total_batches,
pods_processed=len(pods),
validations=[],
recommendations=[],
processing_time=processing_time,
memory_usage=self._get_memory_usage(),
errors=[error_msg]
)
async def _cleanup_memory(self):
"""Clean up memory after each batch"""
try:
# Force garbage collection
gc.collect()
# Small delay to allow memory cleanup
await asyncio.sleep(0.01)
except Exception as e:
logger.warning(f"Error during memory cleanup: {e}")
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # Convert to MB
except ImportError:
return 0.0
except Exception:
return 0.0
async def get_batch_statistics(self, k8s_client: K8sClient) -> Dict[str, Any]:
"""Get statistics about batch processing for the cluster"""
try:
all_pods = await k8s_client.get_all_pods(include_system_namespaces=False)
total_pods = len(all_pods)
total_batches = (total_pods + self.batch_size - 1) // self.batch_size
# Group by namespace
namespace_counts = {}
for pod in all_pods:
namespace_counts[pod.namespace] = namespace_counts.get(pod.namespace, 0) + 1
return {
'total_pods': total_pods,
'total_namespaces': len(namespace_counts),
'batch_size': self.batch_size,
'total_batches': total_batches,
'estimated_processing_time': total_batches * 2.0, # 2 seconds per batch estimate
'namespace_distribution': namespace_counts,
'memory_efficiency': 'High' if total_batches > 10 else 'Standard',
'recommended_batch_size': self._recommend_batch_size(total_pods)
}
except Exception as e:
logger.error(f"Error getting batch statistics: {e}", exc_info=True)
return {
'error': str(e),
'total_pods': 0,
'total_batches': 0
}
def _recommend_batch_size(self, total_pods: int) -> int:
"""Recommend optimal batch size based on cluster size"""
if total_pods < 1000:
return 50
elif total_pods < 5000:
return 100
elif total_pods < 10000:
return 150
else:
return 200
# Global instance
batch_processing_service = BatchProcessingService()

226
app/tasks/batch_analysis.py Normal file
View File

@@ -0,0 +1,226 @@
"""
Celery tasks for batch processing of large clusters
"""
import asyncio
import logging
from typing import Dict, Any, List
from datetime import datetime
import os
from app.celery_app import celery_app
from app.services.batch_processing import batch_processing_service, BatchProgress
from app.core.kubernetes_client import K8sClient
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name='app.tasks.batch_analysis.process_cluster_batch')
def process_cluster_batch(self, cluster_config: Dict[str, Any] = None):
"""
Process cluster analysis in batches for large clusters
Args:
cluster_config: Cluster configuration dict
Returns:
dict: Batch processing results
"""
try:
# Update task state
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': 1,
'status': 'Starting batch processing...',
'batch_number': 0,
'total_batches': 0,
'pods_processed': 0,
'total_pods': 0
}
)
# Initialize clients
k8s_client = K8sClient()
# Run async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(_process_cluster_async(self, k8s_client, cluster_config))
return result
finally:
loop.close()
except Exception as exc:
logger.error(f"Batch processing failed: {str(exc)}", exc_info=True)
return {
'error': str(exc),
'status': 'failed',
'timestamp': datetime.now().isoformat()
}
async def _process_cluster_async(task, k8s_client: K8sClient, cluster_config: Dict[str, Any]):
"""Async processing function"""
try:
# Initialize K8s client
await k8s_client.initialize()
# Get batch statistics
batch_stats = await batch_processing_service.get_batch_statistics(k8s_client)
# Update task with statistics
task.update_state(
state='PROGRESS',
meta={
'current': 1,
'total': batch_stats.get('total_batches', 1),
'status': f"Processing {batch_stats.get('total_pods', 0)} pods in {batch_stats.get('total_batches', 0)} batches...",
'batch_number': 0,
'total_batches': batch_stats.get('total_batches', 0),
'pods_processed': 0,
'total_pods': batch_stats.get('total_pods', 0),
'statistics': batch_stats
}
)
# Process in batches
all_validations = []
all_recommendations = []
total_errors = []
total_processing_time = 0
batch_count = 0
async for batch_result in batch_processing_service.process_cluster_in_batches(
k8s_client,
namespace=cluster_config.get('namespace') if cluster_config else None,
include_system_namespaces=cluster_config.get('include_system_namespaces', False) if cluster_config else False,
progress_callback=lambda progress: _update_task_progress(task, progress)
):
batch_count += 1
# Collect results
all_validations.extend(batch_result.validations)
all_recommendations.extend(batch_result.recommendations)
total_errors.extend(batch_result.errors)
total_processing_time += batch_result.processing_time
# Update task progress
task.update_state(
state='PROGRESS',
meta={
'current': batch_count,
'total': batch_result.total_batches,
'status': f"Completed batch {batch_count}/{batch_result.total_batches} - {len(all_validations)} validations found",
'batch_number': batch_count,
'total_batches': batch_result.total_batches,
'pods_processed': batch_count * batch_processing_service.batch_size,
'total_pods': batch_stats.get('total_pods', 0),
'validations_found': len(all_validations),
'recommendations_generated': len(all_recommendations),
'processing_time': total_processing_time,
'memory_usage': batch_result.memory_usage,
'errors': len(total_errors)
}
)
# Final results
results = {
'timestamp': datetime.now().isoformat(),
'total_pods': batch_stats.get('total_pods', 0),
'total_batches': batch_count,
'batch_size': batch_processing_service.batch_size,
'total_validations': len(all_validations),
'total_recommendations': len(all_recommendations),
'total_errors': len(total_errors),
'processing_time': total_processing_time,
'statistics': batch_stats,
'validations': all_validations,
'recommendations': all_recommendations,
'errors': total_errors,
'status': 'completed'
}
logger.info(f"Batch processing completed: {len(all_validations)} validations, {len(all_recommendations)} recommendations in {total_processing_time:.2f}s")
return results
except Exception as e:
logger.error(f"Error in async batch processing: {e}", exc_info=True)
raise
def _update_task_progress(task, progress: BatchProgress):
"""Update Celery task progress"""
try:
task.update_state(
state='PROGRESS',
meta={
'current': progress.current_batch,
'total': progress.total_batches,
'status': f"Processing batch {progress.current_batch}/{progress.total_batches} - {progress.pods_processed}/{progress.total_pods} pods",
'batch_number': progress.current_batch,
'total_batches': progress.total_batches,
'pods_processed': progress.pods_processed,
'total_pods': progress.total_pods,
'validations_found': progress.validations_found,
'recommendations_generated': progress.recommendations_generated,
'processing_time': progress.processing_time,
'estimated_completion': progress.estimated_completion.isoformat() if progress.estimated_completion else None
}
)
except Exception as e:
logger.warning(f"Error updating task progress: {e}")
@celery_app.task(bind=True, name='app.tasks.batch_analysis.get_batch_statistics')
def get_batch_statistics(self, cluster_config: Dict[str, Any] = None):
"""
Get batch processing statistics for the cluster
Args:
cluster_config: Cluster configuration dict
Returns:
dict: Batch statistics
"""
try:
# Initialize clients
k8s_client = K8sClient()
# Run async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(_get_statistics_async(k8s_client, cluster_config))
return result
finally:
loop.close()
except Exception as exc:
logger.error(f"Error getting batch statistics: {str(exc)}", exc_info=True)
return {
'error': str(exc),
'status': 'failed',
'timestamp': datetime.now().isoformat()
}
async def _get_statistics_async(k8s_client: K8sClient, cluster_config: Dict[str, Any]):
"""Async function to get batch statistics"""
try:
# Initialize K8s client
await k8s_client.initialize()
# Get batch statistics
batch_stats = await batch_processing_service.get_batch_statistics(k8s_client)
return {
'timestamp': datetime.now().isoformat(),
'statistics': batch_stats,
'status': 'completed'
}
except Exception as e:
logger.error(f"Error in async statistics: {e}", exc_info=True)
raise

View File

@@ -11,6 +11,10 @@ sys.path.insert(0, '/app')
from app.celery_app import celery_app
# Import tasks to register them
from app.tasks.cluster_analysis import analyze_cluster
from app.tasks.batch_analysis import process_cluster_batch, get_batch_statistics
if __name__ == '__main__':
# Start Celery worker
celery_app.worker_main([