Initial commit: OpenShift Resource Governance Tool

- Implementa ferramenta completa de governança de recursos
- Backend Python com FastAPI para coleta de dados
- Validações seguindo best practices Red Hat
- Integração com Prometheus e VPA
- UI web interativa para visualização
- Relatórios em JSON, CSV e PDF
- Deploy como DaemonSet com RBAC
- Scripts de automação para build e deploy
This commit is contained in:
2025-09-25 14:26:24 -03:00
commit 4d60c0e039
31 changed files with 3386 additions and 0 deletions

1
app/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Core modules

45
app/core/config.py Normal file
View File

@@ -0,0 +1,45 @@
"""
Configurações da aplicação
"""
import os
from typing import List, Optional
from pydantic import BaseSettings
class Settings(BaseSettings):
"""Configurações da aplicação"""
# Configurações do OpenShift/Kubernetes
kubeconfig_path: Optional[str] = None
cluster_url: Optional[str] = None
token: Optional[str] = None
# Configurações do Prometheus
prometheus_url: str = "http://prometheus.openshift-monitoring.svc.cluster.local:9090"
# Configurações de validação
cpu_limit_ratio: float = 3.0 # Ratio padrão limit:request para CPU
memory_limit_ratio: float = 3.0 # Ratio padrão limit:request para memória
min_cpu_request: str = "10m" # Mínimo de CPU request
min_memory_request: str = "32Mi" # Mínimo de memória request
# Namespaces críticos para VPA
critical_namespaces: List[str] = [
"openshift-monitoring",
"openshift-ingress",
"openshift-apiserver",
"openshift-controller-manager",
"openshift-sdn"
]
# Configurações de relatório
report_export_path: str = "/tmp/reports"
# Configurações de segurança
enable_rbac: bool = True
service_account_name: str = "resource-governance-sa"
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()

View File

@@ -0,0 +1,234 @@
"""
Cliente Kubernetes/OpenShift para coleta de dados
"""
import logging
from typing import List, Dict, Any, Optional
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import asyncio
import aiohttp
from app.core.config import settings
from app.models.resource_models import PodResource, NamespaceResources, VPARecommendation
logger = logging.getLogger(__name__)
class K8sClient:
"""Cliente para interação com Kubernetes/OpenShift"""
def __init__(self):
self.v1 = None
self.autoscaling_v1 = None
self.apps_v1 = None
self.initialized = False
async def initialize(self):
"""Inicializar cliente Kubernetes"""
try:
# Tentar carregar configuração do cluster
if settings.kubeconfig_path:
config.load_kube_config(config_file=settings.kubeconfig_path)
else:
# Usar configuração in-cluster
config.load_incluster_config()
# Inicializar clientes da API
self.v1 = client.CoreV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api()
self.apps_v1 = client.AppsV1Api()
self.initialized = True
logger.info("Cliente Kubernetes inicializado com sucesso")
except Exception as e:
logger.error(f"Erro ao inicializar cliente Kubernetes: {e}")
raise
async def get_all_pods(self) -> List[PodResource]:
"""Coletar informações de todos os pods do cluster"""
if not self.initialized:
raise RuntimeError("Cliente Kubernetes não inicializado")
pods_data = []
try:
# Listar todos os pods em todos os namespaces
pods = self.v1.list_pod_for_all_namespaces(watch=False)
for pod in pods.items:
pod_resource = PodResource(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
node_name=pod.spec.node_name,
phase=pod.status.phase,
containers=[]
)
# Processar containers do pod
for container in pod.spec.containers:
container_resource = {
"name": container.name,
"image": container.image,
"resources": {
"requests": {},
"limits": {}
}
}
# Extrair requests e limits
if container.resources:
if container.resources.requests:
container_resource["resources"]["requests"] = {
k: v for k, v in container.resources.requests.items()
}
if container.resources.limits:
container_resource["resources"]["limits"] = {
k: v for k, v in container.resources.limits.items()
}
pod_resource.containers.append(container_resource)
pods_data.append(pod_resource)
logger.info(f"Coletados {len(pods_data)} pods")
return pods_data
except ApiException as e:
logger.error(f"Erro ao listar pods: {e}")
raise
async def get_namespace_resources(self, namespace: str) -> NamespaceResources:
"""Coletar recursos de um namespace específico"""
if not self.initialized:
raise RuntimeError("Cliente Kubernetes não inicializado")
try:
# Listar pods do namespace
pods = self.v1.list_namespaced_pod(namespace=namespace)
namespace_resource = NamespaceResources(
name=namespace,
pods=[],
total_cpu_requests="0",
total_cpu_limits="0",
total_memory_requests="0",
total_memory_limits="0"
)
for pod in pods.items:
pod_resource = PodResource(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
node_name=pod.spec.node_name,
phase=pod.status.phase,
containers=[]
)
for container in pod.spec.containers:
container_resource = {
"name": container.name,
"image": container.image,
"resources": {
"requests": {},
"limits": {}
}
}
if container.resources:
if container.resources.requests:
container_resource["resources"]["requests"] = {
k: v for k, v in container.resources.requests.items()
}
if container.resources.limits:
container_resource["resources"]["limits"] = {
k: v for k, v in container.resources.limits.items()
}
pod_resource.containers.append(container_resource)
namespace_resource.pods.append(pod_resource)
return namespace_resource
except ApiException as e:
logger.error(f"Erro ao coletar recursos do namespace {namespace}: {e}")
raise
async def get_vpa_recommendations(self) -> List[VPARecommendation]:
"""Coletar recomendações do VPA"""
if not self.initialized:
raise RuntimeError("Cliente Kubernetes não inicializado")
recommendations = []
try:
# Listar VPA objects em todos os namespaces
vpa_list = self.autoscaling_v1.list_vertical_pod_autoscaler_for_all_namespaces()
for vpa in vpa_list.items:
if vpa.status and vpa.status.recommendation:
recommendation = VPARecommendation(
name=vpa.metadata.name,
namespace=vpa.metadata.namespace,
target_ref=vpa.spec.target_ref,
recommendations=vpa.status.recommendation
)
recommendations.append(recommendation)
logger.info(f"Coletadas {len(recommendations)} recomendações VPA")
return recommendations
except ApiException as e:
logger.error(f"Erro ao coletar recomendações VPA: {e}")
# VPA pode não estar instalado, retornar lista vazia
return []
async def get_nodes_info(self) -> List[Dict[str, Any]]:
"""Coletar informações dos nós do cluster"""
if not self.initialized:
raise RuntimeError("Cliente Kubernetes não inicializado")
try:
nodes = self.v1.list_node()
nodes_info = []
for node in nodes.items:
node_info = {
"name": node.metadata.name,
"labels": node.metadata.labels or {},
"capacity": {},
"allocatable": {},
"conditions": []
}
# Capacidade do nó
if node.status.capacity:
node_info["capacity"] = {
k: v for k, v in node.status.capacity.items()
}
# Recursos alocáveis
if node.status.allocatable:
node_info["allocatable"] = {
k: v for k, v in node.status.allocatable.items()
}
# Condições do nó
if node.status.conditions:
node_info["conditions"] = [
{
"type": condition.type,
"status": condition.status,
"reason": condition.reason,
"message": condition.message
}
for condition in node.status.conditions
]
nodes_info.append(node_info)
return nodes_info
except ApiException as e:
logger.error(f"Erro ao coletar informações dos nós: {e}")
raise

View File

@@ -0,0 +1,131 @@
"""
Cliente Prometheus para coleta de métricas
"""
import logging
import aiohttp
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from app.core.config import settings
logger = logging.getLogger(__name__)
class PrometheusClient:
"""Cliente para interação com Prometheus"""
def __init__(self):
self.base_url = settings.prometheus_url
self.session = None
self.initialized = False
async def initialize(self):
"""Inicializar cliente Prometheus"""
try:
self.session = aiohttp.ClientSession()
# Testar conexão
async with self.session.get(f"{self.base_url}/api/v1/query?query=up") as response:
if response.status == 200:
self.initialized = True
logger.info("Cliente Prometheus inicializado com sucesso")
else:
logger.warning(f"Prometheus retornou status {response.status}")
except Exception as e:
logger.error(f"Erro ao inicializar cliente Prometheus: {e}")
# Prometheus pode não estar disponível, continuar sem ele
self.initialized = False
async def query(self, query: str, time: Optional[datetime] = None) -> Dict[str, Any]:
"""Executar query no Prometheus"""
if not self.initialized or not self.session:
return {"status": "error", "message": "Prometheus não disponível"}
try:
params = {"query": query}
if time:
params["time"] = int(time.timestamp())
async with self.session.get(
f"{self.base_url}/api/v1/query",
params=params
) as response:
if response.status == 200:
data = await response.json()
return data
else:
logger.error(f"Erro na query Prometheus: {response.status}")
return {"status": "error", "message": f"HTTP {response.status}"}
except Exception as e:
logger.error(f"Erro ao executar query Prometheus: {e}")
return {"status": "error", "message": str(e)}
async def get_pod_cpu_usage(self, namespace: str, pod_name: str) -> Dict[str, Any]:
"""Obter uso de CPU de um pod específico"""
query = f'rate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod_name}"}}[5m])'
return await self.query(query)
async def get_pod_memory_usage(self, namespace: str, pod_name: str) -> Dict[str, Any]:
"""Obter uso de memória de um pod específico"""
query = f'container_memory_working_set_bytes{{namespace="{namespace}", pod="{pod_name}"}}'
return await self.query(query)
async def get_namespace_resource_usage(self, namespace: str) -> Dict[str, Any]:
"""Obter uso de recursos de um namespace"""
cpu_query = f'sum(rate(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[5m]))'
memory_query = f'sum(container_memory_working_set_bytes{{namespace="{namespace}"}})'
cpu_result = await self.query(cpu_query)
memory_result = await self.query(memory_query)
return {
"cpu": cpu_result,
"memory": memory_result
}
async def get_cluster_overcommit(self) -> Dict[str, Any]:
"""Verificar overcommit no cluster"""
# CPU overcommit
cpu_capacity_query = 'sum(kube_node_status_capacity{resource="cpu"})'
cpu_requests_query = 'sum(kube_pod_container_resource_requests{resource="cpu"})'
# Memory overcommit
memory_capacity_query = 'sum(kube_node_status_capacity{resource="memory"})'
memory_requests_query = 'sum(kube_pod_container_resource_requests{resource="memory"})'
cpu_capacity = await self.query(cpu_capacity_query)
cpu_requests = await self.query(cpu_requests_query)
memory_capacity = await self.query(memory_capacity_query)
memory_requests = await self.query(memory_requests_query)
return {
"cpu": {
"capacity": cpu_capacity,
"requests": cpu_requests
},
"memory": {
"capacity": memory_capacity,
"requests": memory_requests
}
}
async def get_node_resource_usage(self) -> List[Dict[str, Any]]:
"""Obter uso de recursos por nó"""
query = '''
(
kube_node_status_capacity{resource="cpu"} or
kube_node_status_capacity{resource="memory"} or
kube_pod_container_resource_requests{resource="cpu"} or
kube_pod_container_resource_requests{resource="memory"}
)
'''
result = await self.query(query)
return result
async def close(self):
"""Fechar sessão HTTP"""
if self.session:
await self.session.close()