Files
openshift-resource-governance/app/tasks/batch_analysis.py

227 lines
8.2 KiB
Python

"""
Celery tasks for batch processing of large clusters
"""
import asyncio
import logging
from typing import Dict, Any, List
from datetime import datetime
import os
from app.celery_app import celery_app
from app.services.batch_processing import batch_processing_service, BatchProgress
from app.core.kubernetes_client import K8sClient
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name='app.tasks.batch_analysis.process_cluster_batch')
def process_cluster_batch(self, cluster_config: Dict[str, Any] = None):
"""
Process cluster analysis in batches for large clusters
Args:
cluster_config: Cluster configuration dict
Returns:
dict: Batch processing results
"""
try:
# Update task state
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': 1,
'status': 'Starting batch processing...',
'batch_number': 0,
'total_batches': 0,
'pods_processed': 0,
'total_pods': 0
}
)
# Initialize clients
k8s_client = K8sClient()
# Run async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(_process_cluster_async(self, k8s_client, cluster_config))
return result
finally:
loop.close()
except Exception as exc:
logger.error(f"Batch processing failed: {str(exc)}", exc_info=True)
return {
'error': str(exc),
'status': 'failed',
'timestamp': datetime.now().isoformat()
}
async def _process_cluster_async(task, k8s_client: K8sClient, cluster_config: Dict[str, Any]):
"""Async processing function"""
try:
# Initialize K8s client
await k8s_client.initialize()
# Get batch statistics
batch_stats = await batch_processing_service.get_batch_statistics(k8s_client)
# Update task with statistics
task.update_state(
state='PROGRESS',
meta={
'current': 1,
'total': batch_stats.get('total_batches', 1),
'status': f"Processing {batch_stats.get('total_pods', 0)} pods in {batch_stats.get('total_batches', 0)} batches...",
'batch_number': 0,
'total_batches': batch_stats.get('total_batches', 0),
'pods_processed': 0,
'total_pods': batch_stats.get('total_pods', 0),
'statistics': batch_stats
}
)
# Process in batches
all_validations = []
all_recommendations = []
total_errors = []
total_processing_time = 0
batch_count = 0
async for batch_result in batch_processing_service.process_cluster_in_batches(
k8s_client,
namespace=cluster_config.get('namespace') if cluster_config else None,
include_system_namespaces=cluster_config.get('include_system_namespaces', False) if cluster_config else False,
progress_callback=lambda progress: _update_task_progress(task, progress)
):
batch_count += 1
# Collect results
all_validations.extend(batch_result.validations)
all_recommendations.extend(batch_result.recommendations)
total_errors.extend(batch_result.errors)
total_processing_time += batch_result.processing_time
# Update task progress
task.update_state(
state='PROGRESS',
meta={
'current': batch_count,
'total': batch_result.total_batches,
'status': f"Completed batch {batch_count}/{batch_result.total_batches} - {len(all_validations)} validations found",
'batch_number': batch_count,
'total_batches': batch_result.total_batches,
'pods_processed': batch_count * batch_processing_service.batch_size,
'total_pods': batch_stats.get('total_pods', 0),
'validations_found': len(all_validations),
'recommendations_generated': len(all_recommendations),
'processing_time': total_processing_time,
'memory_usage': batch_result.memory_usage,
'errors': len(total_errors)
}
)
# Final results
results = {
'timestamp': datetime.now().isoformat(),
'total_pods': batch_stats.get('total_pods', 0),
'total_batches': batch_count,
'batch_size': batch_processing_service.batch_size,
'total_validations': len(all_validations),
'total_recommendations': len(all_recommendations),
'total_errors': len(total_errors),
'processing_time': total_processing_time,
'statistics': batch_stats,
'validations': all_validations,
'recommendations': all_recommendations,
'errors': total_errors,
'status': 'completed'
}
logger.info(f"Batch processing completed: {len(all_validations)} validations, {len(all_recommendations)} recommendations in {total_processing_time:.2f}s")
return results
except Exception as e:
logger.error(f"Error in async batch processing: {e}", exc_info=True)
raise
def _update_task_progress(task, progress: BatchProgress):
"""Update Celery task progress"""
try:
task.update_state(
state='PROGRESS',
meta={
'current': progress.current_batch,
'total': progress.total_batches,
'status': f"Processing batch {progress.current_batch}/{progress.total_batches} - {progress.pods_processed}/{progress.total_pods} pods",
'batch_number': progress.current_batch,
'total_batches': progress.total_batches,
'pods_processed': progress.pods_processed,
'total_pods': progress.total_pods,
'validations_found': progress.validations_found,
'recommendations_generated': progress.recommendations_generated,
'processing_time': progress.processing_time,
'estimated_completion': progress.estimated_completion.isoformat() if progress.estimated_completion else None
}
)
except Exception as e:
logger.warning(f"Error updating task progress: {e}")
@celery_app.task(bind=True, name='app.tasks.batch_analysis.get_batch_statistics')
def get_batch_statistics(self, cluster_config: Dict[str, Any] = None):
"""
Get batch processing statistics for the cluster
Args:
cluster_config: Cluster configuration dict
Returns:
dict: Batch statistics
"""
try:
# Initialize clients
k8s_client = K8sClient()
# Run async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(_get_statistics_async(k8s_client, cluster_config))
return result
finally:
loop.close()
except Exception as exc:
logger.error(f"Error getting batch statistics: {str(exc)}", exc_info=True)
return {
'error': str(exc),
'status': 'failed',
'timestamp': datetime.now().isoformat()
}
async def _get_statistics_async(k8s_client: K8sClient, cluster_config: Dict[str, Any]):
"""Async function to get batch statistics"""
try:
# Initialize K8s client
await k8s_client.initialize()
# Get batch statistics
batch_stats = await batch_processing_service.get_batch_statistics(k8s_client)
return {
'timestamp': datetime.now().isoformat(),
'statistics': batch_stats,
'status': 'completed'
}
except Exception as e:
logger.error(f"Error in async statistics: {e}", exc_info=True)
raise