Files
openshift-resource-governance/app/services/report_service.py
andersonid 4d60c0e039 Initial commit: OpenShift Resource Governance Tool
- Implementa ferramenta completa de governança de recursos
- Backend Python com FastAPI para coleta de dados
- Validações seguindo best practices Red Hat
- Integração com Prometheus e VPA
- UI web interativa para visualização
- Relatórios em JSON, CSV e PDF
- Deploy como DaemonSet com RBAC
- Scripts de automação para build e deploy
2025-09-25 14:26:24 -03:00

307 lines
11 KiB
Python

"""
Serviço de geração de relatórios
"""
import logging
import json
import csv
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from io import StringIO
from app.models.resource_models import (
ClusterReport, NamespaceReport, ResourceValidation,
VPARecommendation, ExportRequest
)
from app.core.config import settings
logger = logging.getLogger(__name__)
class ReportService:
"""Serviço para geração de relatórios"""
def __init__(self):
self.export_path = settings.report_export_path
os.makedirs(self.export_path, exist_ok=True)
def generate_cluster_report(
self,
pods: List[Any],
validations: List[ResourceValidation],
vpa_recommendations: List[VPARecommendation],
overcommit_info: Dict[str, Any],
nodes_info: List[Dict[str, Any]]
) -> ClusterReport:
"""Gerar relatório do cluster"""
# Contar namespaces únicos
namespaces = set(pod.namespace for pod in pods)
# Gerar resumo
summary = self._generate_summary(validations, vpa_recommendations, overcommit_info)
report = ClusterReport(
timestamp=datetime.now().isoformat(),
total_pods=len(pods),
total_namespaces=len(namespaces),
total_nodes=len(nodes_info),
validations=validations,
vpa_recommendations=vpa_recommendations,
overcommit_info=overcommit_info,
summary=summary
)
return report
def generate_namespace_report(
self,
namespace: str,
pods: List[Any],
validations: List[ResourceValidation],
resource_usage: Dict[str, Any]
) -> NamespaceReport:
"""Gerar relatório de um namespace"""
# Filtrar validações do namespace
namespace_validations = [
v for v in validations if v.namespace == namespace
]
# Gerar recomendações
recommendations = self._generate_namespace_recommendations(namespace_validations)
report = NamespaceReport(
namespace=namespace,
timestamp=datetime.now().isoformat(),
total_pods=len(pods),
validations=namespace_validations,
resource_usage=resource_usage,
recommendations=recommendations
)
return report
def _generate_summary(
self,
validations: List[ResourceValidation],
vpa_recommendations: List[VPARecommendation],
overcommit_info: Dict[str, Any]
) -> Dict[str, Any]:
"""Gerar resumo do relatório"""
# Contar validações por severidade
severity_counts = {}
for validation in validations:
severity = validation.severity
if severity not in severity_counts:
severity_counts[severity] = 0
severity_counts[severity] += 1
# Contar validações por tipo
type_counts = {}
for validation in validations:
validation_type = validation.validation_type
if validation_type not in type_counts:
type_counts[validation_type] = 0
type_counts[validation_type] += 1
return {
"total_validations": len(validations),
"severity_breakdown": severity_counts,
"validation_types": type_counts,
"vpa_recommendations_count": len(vpa_recommendations),
"overcommit_detected": overcommit_info.get("overcommit_detected", False),
"critical_issues": severity_counts.get("critical", 0),
"warnings": severity_counts.get("warning", 0),
"errors": severity_counts.get("error", 0)
}
def _generate_namespace_recommendations(
self,
validations: List[ResourceValidation]
) -> List[str]:
"""Gerar recomendações para um namespace"""
recommendations = []
# Agrupar por tipo de problema
problems = {}
for validation in validations:
problem_type = validation.validation_type
if problem_type not in problems:
problems[problem_type] = []
problems[problem_type].append(validation)
# Gerar recomendações específicas
if "missing_requests" in problems:
count = len(problems["missing_requests"])
recommendations.append(
f"Criar LimitRange para definir requests padrão "
f"({count} containers sem requests)"
)
if "missing_limits" in problems:
count = len(problems["missing_limits"])
recommendations.append(
f"Definir limits para {count} containers para evitar consumo excessivo"
)
if "invalid_ratio" in problems:
count = len(problems["invalid_ratio"])
recommendations.append(
f"Ajustar ratio limit:request para {count} containers"
)
if "overcommit" in problems:
recommendations.append(
"Resolver overcommit de recursos no namespace"
)
return recommendations
async def export_report(
self,
report: ClusterReport,
export_request: ExportRequest
) -> str:
"""Exportar relatório em diferentes formatos"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if export_request.format == "json":
return await self._export_json(report, timestamp)
elif export_request.format == "csv":
return await self._export_csv(report, timestamp)
elif export_request.format == "pdf":
return await self._export_pdf(report, timestamp)
else:
raise ValueError(f"Formato não suportado: {export_request.format}")
async def _export_json(self, report: ClusterReport, timestamp: str) -> str:
"""Exportar relatório em JSON"""
filename = f"cluster_report_{timestamp}.json"
filepath = os.path.join(self.export_path, filename)
# Converter para dict para serialização
report_dict = report.dict()
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report_dict, f, indent=2, ensure_ascii=False)
logger.info(f"Relatório JSON exportado: {filepath}")
return filepath
async def _export_csv(self, report: ClusterReport, timestamp: str) -> str:
"""Exportar relatório em CSV"""
filename = f"cluster_report_{timestamp}.csv"
filepath = os.path.join(self.export_path, filename)
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Cabeçalho
writer.writerow([
"Pod Name", "Namespace", "Container Name",
"Validation Type", "Severity", "Message", "Recommendation"
])
# Dados das validações
for validation in report.validations:
writer.writerow([
validation.pod_name,
validation.namespace,
validation.container_name,
validation.validation_type,
validation.severity,
validation.message,
validation.recommendation or ""
])
logger.info(f"Relatório CSV exportado: {filepath}")
return filepath
async def _export_pdf(self, report: ClusterReport, timestamp: str) -> str:
"""Exportar relatório em PDF"""
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
filename = f"cluster_report_{timestamp}.pdf"
filepath = os.path.join(self.export_path, filename)
doc = SimpleDocTemplate(filepath, pagesize=letter)
styles = getSampleStyleSheet()
story = []
# Título
title = Paragraph("OpenShift Resource Governance Report", styles['Title'])
story.append(title)
story.append(Spacer(1, 12))
# Resumo
summary_text = f"""
<b>Resumo do Cluster:</b><br/>
Total de Pods: {report.total_pods}<br/>
Total de Namespaces: {report.total_namespaces}<br/>
Total de Nós: {report.total_nodes}<br/>
Total de Validações: {report.summary['total_validations']}<br/>
Problemas Críticos: {report.summary['critical_issues']}<br/>
"""
story.append(Paragraph(summary_text, styles['Normal']))
story.append(Spacer(1, 12))
# Tabela de validações
if report.validations:
data = [["Pod", "Namespace", "Container", "Tipo", "Severidade", "Mensagem"]]
for validation in report.validations[:50]: # Limitar a 50 para PDF
data.append([
validation.pod_name,
validation.namespace,
validation.container_name,
validation.validation_type,
validation.severity,
validation.message[:50] + "..." if len(validation.message) > 50 else validation.message
])
table = Table(data)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 14),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(Paragraph("<b>Validações:</b>", styles['Heading2']))
story.append(table)
doc.build(story)
logger.info(f"Relatório PDF exportado: {filepath}")
return filepath
except ImportError:
logger.error("reportlab não instalado. Instale com: pip install reportlab")
raise ValueError("PDF export requer reportlab")
def get_exported_reports(self) -> List[Dict[str, str]]:
"""Listar relatórios exportados"""
reports = []
for filename in os.listdir(self.export_path):
if filename.endswith(('.json', '.csv', '.pdf')):
filepath = os.path.join(self.export_path, filename)
stat = os.stat(filepath)
reports.append({
"filename": filename,
"filepath": filepath,
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"format": filename.split('.')[-1]
})
return sorted(reports, key=lambda x: x["created"], reverse=True)