excel-planner/dashboard/tablero_server.py

636 lines
22 KiB
Python

"""
Servidor local para Tablero_proyectos_v7.html.
Sirve el HTML, expone /api/dashboard.tsv y notifica cambios via SSE
cuando Power Automate llama a /api/webhook/planner-changed.
"""
from __future__ import annotations
import argparse
import json
import socket
import sys
import threading
import time
import webbrowser
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from config import (
BASE_DIR,
DASHBOARD_CACHE_SECONDS,
DASHBOARD_MIN_BACKGROUND_SECONDS,
DASHBOARD_REFRESH_SECONDS,
DASHBOARD_HOST,
DASHBOARD_OPEN_BROWSER,
DASHBOARD_PORT,
DASHBOARD_USE_EVENTS,
DASHBOARD_VERBOSE,
DASHBOARD_WEBHOOK_DEBOUNCE_SECONDS,
DASHBOARD_WEBHOOK_SECRET,
data_source_label,
validate_config,
)
from dashboard.event_hub import EventHub, iter_sse_messages
from dashboard.refresh_debouncer import RefreshDebouncer
from export.rtc_tsv_exporter import projects_to_tsv
from services.data_factory import create_data_service
WEB_DIR = BASE_DIR / "web"
ACTA_CONFIG_PATH = BASE_DIR / "config" / "acta_config.json"
DEFAULT_ACTA_CONFIG: dict = {
"titulo": "ACTA COMITÉ DE PROYECTOS RTC",
"subtitulo": "Reporte Automático de Sistema",
"elaboradoPor": "Alexander Morales",
"lugar": "Virtual / Presencial",
"descripcion": "Análisis de los proyectos activos y proyección operativa.",
"asistentes": [
{"nombre": "Jojn B. Casas", "cargo": "Gerente General"},
{"nombre": "Oscar Garcia", "cargo": "Director Operación RTC"},
{"nombre": "Ayda Franco", "cargo": "Líder Mesa de Servicios"},
{"nombre": "Alexander Morales", "cargo": "Líder Proyectos"},
],
}
_CLIENT_GONE_ERRORS = (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)
def load_acta_config() -> dict:
if ACTA_CONFIG_PATH.is_file():
try:
data = json.loads(ACTA_CONFIG_PATH.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except (OSError, json.JSONDecodeError):
pass
return dict(DEFAULT_ACTA_CONFIG)
def save_acta_config(data: dict) -> dict:
ACTA_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
normalized = {
"titulo": str(data.get("titulo", DEFAULT_ACTA_CONFIG["titulo"])).strip(),
"subtitulo": str(data.get("subtitulo", DEFAULT_ACTA_CONFIG["subtitulo"])).strip(),
"elaboradoPor": str(data.get("elaboradoPor", DEFAULT_ACTA_CONFIG["elaboradoPor"])).strip(),
"lugar": str(data.get("lugar", DEFAULT_ACTA_CONFIG["lugar"])).strip(),
"descripcion": str(data.get("descripcion", DEFAULT_ACTA_CONFIG["descripcion"])).strip(),
"asistentes": [],
}
raw_asistentes = data.get("asistentes")
if isinstance(raw_asistentes, list):
for item in raw_asistentes:
if not isinstance(item, dict):
continue
nombre = str(item.get("nombre", "")).strip()
cargo = str(item.get("cargo", "")).strip()
if nombre or cargo:
normalized["asistentes"].append({"nombre": nombre, "cargo": cargo})
if not normalized["asistentes"]:
normalized["asistentes"] = list(DEFAULT_ACTA_CONFIG["asistentes"])
ACTA_CONFIG_PATH.write_text(
json.dumps(normalized, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
return normalized
def get_local_ipv4_addresses() -> list[str]:
"""Obtiene IPs IPv4 de esta PC para compartir en la red local."""
addresses: set[str] = set()
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as probe:
probe.connect(("8.8.8.8", 80))
addresses.add(probe.getsockname()[0])
except OSError:
pass
try:
hostname = socket.gethostname()
for info in socket.getaddrinfo(hostname, None, socket.AF_INET):
addresses.add(info[4][0])
except OSError:
pass
addresses.discard("127.0.0.1")
return sorted(addresses)
def print_access_urls(host: str, port: int) -> None:
print(f"[OK] En este equipo: http://localhost:{port}/", flush=True)
if host in {"127.0.0.1", "localhost"}:
print(" Modo solo local (DASHBOARD_HOST=127.0.0.1)", flush=True)
return
ips = get_local_ipv4_addresses()
if ips:
print(" En la red local (otros PCs, Chrome/Edge):", flush=True)
for ip in ips:
print(f" -> http://{ip}:{port}/", flush=True)
primary_ip = ips[0]
print(f" Webhook PA: http://{primary_ip}:{port}/api/webhook/planner-changed", flush=True)
else:
print(" [AVISO] No se detecto IP de red. Usa ipconfig para ver tu IPv4.", flush=True)
print(f" Puerto: {port} (debe estar abierto en el firewall de Windows)", flush=True)
def schedule_open_browser(port: int) -> None:
"""Abre el tablero en el navegador predeterminado (Chrome/Edge) tras iniciar el servidor."""
url = f"http://localhost:{port}/"
def _open() -> None:
time.sleep(0.8)
opened = webbrowser.open(url, new=0)
if opened:
print(f"[OK] Navegador abierto: {url}", flush=True)
else:
print(f"[AVISO] Abre manualmente en el navegador: {url}", flush=True)
threading.Thread(target=_open, daemon=True).start()
class DashboardCache:
def __init__(
self,
ttl_seconds: int,
*,
min_background_seconds: int = 5,
on_updated=None,
) -> None:
self.ttl_seconds = ttl_seconds
self.min_background_seconds = min_background_seconds
self._on_updated = on_updated
self._data_lock = threading.Lock()
self._refresh_lock = threading.Lock()
self._refreshing = False
self._cached_at = 0.0
self._cached_tsv = ""
self._last_error = ""
self._last_background_at = 0.0
def _notify_updated(self) -> None:
if self._on_updated:
try:
self._on_updated()
except Exception:
pass
def _fetch_fresh(self) -> str:
service = create_data_service()
projects = service.get_all_projects()
return projects_to_tsv(projects)
def _cache_age(self) -> float:
if not self._cached_tsv:
return float("inf")
return time.time() - self._cached_at
def _is_stale(self) -> bool:
if not self._cached_tsv:
return True
return self._cache_age() >= self.ttl_seconds
def _should_background_refresh(self) -> bool:
if self._refreshing:
return False
since_bg = time.time() - self._last_background_at
if since_bg < self.min_background_seconds:
return False
return self._is_stale()
def _start_background_refresh(self) -> None:
with self._refresh_lock:
if self._refreshing:
return
self._refreshing = True
self._last_background_at = time.time()
def _run() -> None:
try:
self._refresh()
finally:
with self._refresh_lock:
self._refreshing = False
threading.Thread(target=_run, daemon=True).start()
def _refresh(self) -> tuple[str, bool]:
try:
fresh = self._fetch_fresh()
except Exception as exc:
self._last_error = str(exc)
with self._data_lock:
if self._cached_tsv:
return self._cached_tsv, True
raise
with self._data_lock:
self._cached_tsv = fresh
self._cached_at = time.time()
self._last_error = ""
self._notify_updated()
return self._cached_tsv, False
def get_tsv(self, *, force: bool = False) -> tuple[str, bool]:
"""Devuelve (tsv, from_cache). Sirve cache mientras Power Automate responde."""
with self._data_lock:
cached = self._cached_tsv
stale = self._is_stale()
if force or stale:
if self._should_background_refresh():
self._start_background_refresh()
if cached and not stale:
return cached, True
if cached:
return cached, True
with self._refresh_lock:
already_refreshing = self._refreshing
if already_refreshing:
deadline = time.time() + 5
while time.time() < deadline:
with self._data_lock:
if self._cached_tsv:
return self._cached_tsv, True
time.sleep(0.2)
return self._refresh()
@property
def last_error(self) -> str:
return self._last_error
def create_handler(
cache: DashboardCache,
directory: Path,
*,
event_hub: EventHub,
debouncer: RefreshDebouncer,
use_events: bool,
refresh_seconds: int,
):
def refresh_and_notify() -> None:
try:
cache._last_background_at = 0.0
cache._start_background_refresh()
print("[WEBHOOK] Actualizacion desde Planner programada.", flush=True)
except Exception as exc:
event_hub.broadcast("planner-error", json.dumps({"error": str(exc)}))
print(f"[WEBHOOK] Error al refrescar: {exc}", flush=True)
class TableroRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
def _write_body(self, body: bytes) -> None:
try:
self.wfile.write(body)
except _CLIENT_GONE_ERRORS:
pass
def handle_one_request(self) -> None:
try:
super().handle_one_request()
except _CLIENT_GONE_ERRORS:
pass
def log_message(self, format: str, *args) -> None:
if args and str(args[0]).startswith(("GET /api/", "POST /api/")):
sys.stdout.write("[API] " + (format % args) + "\n")
return
super().log_message(format, *args)
def do_GET(self) -> None:
parsed = urlparse(self.path)
route = parsed.path
if route == "/api/dashboard.tsv":
self._serve_dashboard_tsv(parsed)
return
if route == "/api/health":
self._serve_health()
return
if route == "/api/acta-config":
self._serve_acta_config_get()
return
if route == "/api/events":
self._serve_sse()
return
if route == "/config.js":
self._serve_config_js()
return
if route in {"/", ""}:
self.path = "/Tablero_proyectos_v7.html"
super().do_GET()
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/api/webhook/planner-changed":
self._handle_planner_webhook(parsed)
return
if parsed.path == "/api/acta-config":
self._handle_acta_config_post()
return
self.send_error(404, "Not Found")
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0))
if length <= 0:
return b""
return self.rfile.read(length)
def _webhook_authorized(self, parsed) -> bool:
if not DASHBOARD_WEBHOOK_SECRET:
return True
query = parse_qs(parsed.query)
header_secret = self.headers.get("X-Webhook-Secret", "")
query_secret = query.get("secret", [""])[0]
return header_secret == DASHBOARD_WEBHOOK_SECRET or query_secret == DASHBOARD_WEBHOOK_SECRET
def _handle_planner_webhook(self, parsed) -> None:
if not self._webhook_authorized(parsed):
body = b'{"ok": false, "error": "Unauthorized"}'
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
return
raw_body = self._read_body()
if raw_body:
try:
payload = json.loads(raw_body.decode("utf-8"))
origin = payload.get("plan_id") or payload.get("tarea_id") or "planner"
except json.JSONDecodeError:
origin = "planner"
else:
origin = "planner"
print(f"[WEBHOOK] Cambio detectado en Planner ({origin}). Programando refresco...", flush=True)
debouncer.schedule(refresh_and_notify)
body = b'{"ok": true, "mensaje": "Actualizacion programada"}'
self.send_response(202)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
def _serve_dashboard_tsv(self, parsed) -> None:
force = "refresh" in parse_qs(parsed.query)
try:
tsv, from_cache = cache.get_tsv(force=force)
if not tsv.strip():
raise RuntimeError(
"Sin datos en cache. Power Automate aun no respondio; reintenta en unos segundos."
)
body = tsv.encode("utf-8")
cache_status = "HIT" if from_cache else "MISS"
self.send_response(200)
self.send_header("Content-Type", "text/tab-separated-values; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.send_header("X-Cache", cache_status)
self.end_headers()
self._write_body(body)
except _CLIENT_GONE_ERRORS:
pass
except Exception as exc:
message = f"Error al obtener datos de Planner: {exc}"
body = message.encode("utf-8")
try:
self.send_response(502)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
except _CLIENT_GONE_ERRORS:
pass
def _serve_health(self) -> None:
payload = {
"ok": True,
"events": use_events,
"sse_clients": event_hub.client_count,
}
body = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
def _serve_acta_config_get(self) -> None:
payload = load_acta_config()
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self._write_body(body)
def _handle_acta_config_post(self) -> None:
raw_body = self._read_body()
try:
data = json.loads(raw_body.decode("utf-8")) if raw_body else {}
except json.JSONDecodeError:
body = b'{"ok": false, "error": "JSON invalido"}'
self.send_response(400)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
return
try:
saved = save_acta_config(data)
except OSError as exc:
body = json.dumps(
{"ok": False, "error": str(exc)},
ensure_ascii=False,
).encode("utf-8")
self.send_response(500)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
return
body = json.dumps({"ok": True, "config": saved}, ensure_ascii=False).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self._write_body(body)
def _serve_sse(self) -> None:
client_queue = event_hub.subscribe()
self.send_response(200)
self.send_header("Content-Type", "text/event-stream; charset=utf-8")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
try:
for chunk in iter_sse_messages(client_queue):
try:
self.wfile.write(chunk)
self.wfile.flush()
except _CLIENT_GONE_ERRORS:
break
except OSError:
pass
finally:
event_hub.unsubscribe(client_queue)
def _serve_config_js(self) -> None:
refresh_ms = refresh_seconds * 1000
use_events_js = "true" if use_events else "false"
body = (
f"window.DASHBOARD_USE_EVENTS = {use_events_js};\n"
f"window.DASHBOARD_REFRESH_MS = {refresh_ms};\n"
f"window.DASHBOARD_FALLBACK_MS = {refresh_ms};\n"
).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/javascript; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self._write_body(body)
return TableroRequestHandler
def run_server(
host: str | None = None,
port: int | None = None,
*,
open_browser: bool | None = None,
) -> None:
missing = validate_config()
if missing:
print("[ERROR] Faltan variables en .env:")
for var in missing:
print(f" - {var}")
sys.exit(1)
if not WEB_DIR.is_dir():
print(f"[ERROR] No existe la carpeta web: {WEB_DIR}")
sys.exit(1)
bind_host = host or DASHBOARD_HOST
bind_port = port or DASHBOARD_PORT
event_hub = EventHub()
def _broadcast_data_updated() -> None:
if not DASHBOARD_USE_EVENTS:
return
notified = event_hub.broadcast("planner-changed", "{}")
if DASHBOARD_VERBOSE and notified > 0:
print(f"[SYNC] Tablero notificado ({notified} cliente(s)).", flush=True)
cache = DashboardCache(
DASHBOARD_CACHE_SECONDS,
min_background_seconds=DASHBOARD_MIN_BACKGROUND_SECONDS,
on_updated=_broadcast_data_updated,
)
debouncer = RefreshDebouncer(DASHBOARD_WEBHOOK_DEBOUNCE_SECONDS)
handler = create_handler(
cache,
WEB_DIR,
event_hub=event_hub,
debouncer=debouncer,
use_events=DASHBOARD_USE_EVENTS,
refresh_seconds=DASHBOARD_REFRESH_SECONDS,
)
with ThreadingHTTPServer((bind_host, bind_port), handler) as httpd:
print("=" * 50, flush=True)
print(" Tablero RTC - Servidor de red", flush=True)
print(f" Fuente: {data_source_label()}", flush=True)
print("=" * 50, flush=True)
print_access_urls(bind_host, bind_port)
if DASHBOARD_USE_EVENTS:
print(" Modo: tiempo real (SSE + webhook de Planner)", flush=True)
print(f" Debounce webhook: {DASHBOARD_WEBHOOK_DEBOUNCE_SECONDS}s", flush=True)
if DASHBOARD_WEBHOOK_SECRET:
print(" Webhook: protegido con DASHBOARD_WEBHOOK_SECRET", flush=True)
else:
print(" [AVISO] Webhook sin secreto. Define DASHBOARD_WEBHOOK_SECRET en .env", flush=True)
if DASHBOARD_REFRESH_SECONDS > 0:
print(
f" Auto-actualizacion: cada {DASHBOARD_REFRESH_SECONDS}s "
f"(cache servidor {DASHBOARD_CACHE_SECONDS}s)",
flush=True,
)
else:
print(" Auto-actualizacion: desactivada (solo al abrir o webhook)", flush=True)
print(" Ctrl+C para detener", flush=True)
def _warm_cache() -> None:
print("[...] Cargando datos iniciales de Planner (puede tardar 1-3 min)...", flush=True)
with cache._refresh_lock:
cache._refreshing = True
try:
cache._last_background_at = 0.0
cache._refresh()
print("[OK] Cache inicial de Planner listo.", flush=True)
except Exception as exc:
print(f"[AVISO] Cache inicial no disponible: {exc}", flush=True)
finally:
with cache._refresh_lock:
cache._refreshing = False
threading.Thread(target=_warm_cache, daemon=True).start()
if DASHBOARD_REFRESH_SECONDS > 0:
def _periodic_sync() -> None:
while True:
time.sleep(DASHBOARD_REFRESH_SECONDS)
cache._start_background_refresh()
threading.Thread(target=_periodic_sync, daemon=True).start()
should_open = DASHBOARD_OPEN_BROWSER if open_browser is None else open_browser
if should_open:
schedule_open_browser(bind_port)
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n[OK] Servidor detenido.")
def main() -> None:
parser = argparse.ArgumentParser(description="Servidor local del tablero HTML v7")
parser.add_argument("--host", default=None, help="Host (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=None, help="Puerto (default: 8765)")
parser.add_argument(
"--no-browser",
action="store_true",
help="No abrir el navegador automaticamente al iniciar",
)
args = parser.parse_args()
run_server(host=args.host, port=args.port, open_browser=not args.no_browser)
if __name__ == "__main__":
main()