import time from typing import Any from urllib.parse import urlencode import requests from msal import ConfidentialClientApplication, PublicClientApplication from auth.graph_endpoints import GraphEndpoint from config import ( APPLICATION_SCOPES, AUTH_MODE, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID, DELEGATED_SCOPES, GRAPH_API_VERSION, GRAPH_BASE_URL, ) def create_graph_client() -> "GraphClient": if AUTH_MODE == "application": return GraphClient(mode="application") return GraphClient(mode="delegated") class GraphClient: """ Cliente HTTP para Microsoft Graph API. Patron oficial: {HTTP method} https://graph.microsoft.com/{version}/{resource}?{query-parameters} """ def __init__(self, mode: str | None = None) -> None: self.mode = mode or AUTH_MODE authority = f"https://login.microsoftonline.com/{AZURE_TENANT_ID}" self._token: str | None = None if self.mode == "application": self._app = ConfidentialClientApplication( client_id=AZURE_CLIENT_ID, client_credential=AZURE_CLIENT_SECRET, authority=authority, ) else: self._app = PublicClientApplication( client_id=AZURE_CLIENT_ID, authority=authority, ) def _ensure_token(self) -> str: if self._token: return self._token if self.mode == "application": result = self._app.acquire_token_for_client(scopes=APPLICATION_SCOPES) else: accounts = self._app.get_accounts() if accounts: result = self._app.acquire_token_silent(DELEGATED_SCOPES, account=accounts[0]) else: result = self._acquire_token_interactive() if "access_token" not in result: error = result.get("error_description", result.get("error", "Error desconocido")) raise RuntimeError(f"No se pudo autenticar con Microsoft Graph: {error}") self._token = result["access_token"] return self._token def _acquire_token_interactive(self) -> dict: flow = self._app.initiate_device_flow(scopes=DELEGATED_SCOPES) if "user_code" not in flow: raise RuntimeError(f"No se pudo iniciar el flujo de login: {flow}") print("\n" + "=" * 50) print(flow["message"]) print("=" * 50 + "\n") return self._app.acquire_token_by_device_flow(flow) def build_url(self, resource: str, query_parameters: dict | None = None) -> str: """Construye la URL segun el patron de Microsoft Graph.""" path = resource.lstrip("/") url = f"{GRAPH_BASE_URL}/{path}" if query_parameters: url = f"{url}?{urlencode(query_parameters)}" return url def request( self, method: str, resource: str, query_parameters: dict | None = None, json_body: dict | None = None, ) -> dict[str, Any]: """ Ejecuta una peticion HTTP a Graph. Ejemplo: client.request("GET", "me/planner/plans") client.request("GET", "groups", {"$filter": "..."}) """ url = self.build_url(resource, query_parameters) headers = { "Authorization": f"Bearer {self._ensure_token()}", "Content-Type": "application/json", } for attempt in range(5): response = requests.request( method=method.upper(), url=url, headers=headers, json=json_body, timeout=60, ) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 5)) time.sleep(retry_after) continue response.raise_for_status() if response.status_code == 204 or not response.content: return {} return response.json() raise RuntimeError(f"Demasiados reintentos para: {method.upper()} {url}") def call_endpoint(self, endpoint: GraphEndpoint) -> dict[str, Any]: """Ejecuta un GraphEndpoint definido.""" return self.request( method=endpoint.method, resource=endpoint.resource, query_parameters=endpoint.query_parameters or None, ) def get(self, endpoint: str, params: dict | None = None) -> dict[str, Any]: """GET compatible con rutas relativas o URLs completas de nextLink.""" if endpoint.startswith("http"): headers = {"Authorization": f"Bearer {self._ensure_token()}"} for attempt in range(5): response = requests.get(endpoint, headers=headers, params=params, timeout=60) if response.status_code == 429: time.sleep(int(response.headers.get("Retry-After", 5))) continue response.raise_for_status() return response.json() raise RuntimeError(f"Demasiados reintentos para: {endpoint}") resource = endpoint.lstrip("/") return self.request("GET", resource, query_parameters=params) def get_all_pages(self, endpoint: str, params: dict | None = None) -> list[dict]: """Obtiene todos los resultados paginados (@odata.nextLink).""" items: list[dict] = [] if endpoint.startswith("http"): base_url = endpoint resource = endpoint else: base_url = self.build_url(endpoint.lstrip("/"), params) resource = endpoint url: str | None = base_url first = True while url: if url.startswith("http"): data = self.get(url, params=params if first else None) else: data = self.request("GET", resource.lstrip("/"), query_parameters=params if first else None) url = None items.extend(data.get("value", [])) url = data.get("@odata.nextLink") first = False params = None return items def probe(self, endpoint: GraphEndpoint) -> dict[str, Any]: """ Prueba un endpoint y devuelve resultado con metadatos HTTP. Util para diagnosticar la conexion. """ url = self.build_url(endpoint.resource, endpoint.query_parameters or None) try: data = self.call_endpoint(endpoint) count = len(data.get("value", [])) if "value" in data else 1 return { "ok": True, "method": endpoint.method, "url": url, "description": endpoint.description, "status": 200, "items": count, "sample": data.get("value", [data])[:2], } except requests.HTTPError as e: body = "" try: body = e.response.json().get("error", {}).get("message", "") except Exception: body = str(e) return { "ok": False, "method": endpoint.method, "url": url, "description": endpoint.description, "status": e.response.status_code if e.response else 0, "error": body, } except Exception as e: return { "ok": False, "method": endpoint.method, "url": url, "description": endpoint.description, "status": 0, "error": str(e), }