Source code for core.ai.providers.openai_provider

"""OpenAI-compatible API provider (works with OpenAI, Azure, and compatible endpoints)."""
from __future__ import annotations

import json
import time
import urllib.request
import urllib.error
from typing import Dict, List

from core.ai.providers.base import AIProvider
from core.logger import get_logger

_logger = get_logger("ai.openai")


[docs] class OpenAIProvider(AIProvider): """Provider that talks to the OpenAI Chat Completions API. Also works with any OpenAI-compatible endpoint (e.g. Azure, Together, Groq, local vLLM) by changing *base_url*. """ def __init__(self, api_key: str = "", model: str = "gpt-4o-mini", base_url: str = "https://api.openai.com/v1"): self.api_key = api_key self.model = model self.base_url = base_url.rstrip("/") # ------------------------------------------------------------------ # AIProvider interface # ------------------------------------------------------------------ MAX_RETRIES = 3 RETRY_BASE_DELAY = 2.0 # seconds
[docs] def chat(self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 4096, **kwargs) -> str: body = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False, } for attempt in range(self.MAX_RETRIES): try: data = self._request("/chat/completions", body) return data["choices"][0]["message"]["content"] except urllib.error.HTTPError as e: if e.code == 429 and attempt < self.MAX_RETRIES - 1: delay = self.RETRY_BASE_DELAY * (2 ** attempt) _logger.warning(f"Rate limited (429), retrying in {delay:.0f}s (attempt {attempt + 1}/{self.MAX_RETRIES})") time.sleep(delay) continue _logger.error("OpenAI chat error", error=str(e)) return f"[Error] {e}" except Exception as e: _logger.error("OpenAI chat error", error=str(e)) return f"[Error] {e}" return "[Error] Max retries exceeded"
[docs] def chat_stream(self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 4096, **kwargs): body = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True, } url = f"{self.base_url}/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", } req = urllib.request.Request( url, data=json.dumps(body).encode("utf-8"), headers=headers, method="POST", ) for attempt in range(self.MAX_RETRIES): try: with urllib.request.urlopen(req, timeout=120) as resp: for raw_line in resp: line = raw_line.decode("utf-8").strip() if not line or not line.startswith("data:"): continue payload = line[len("data:"):].strip() if payload == "[DONE]": break try: chunk = json.loads(payload) delta = chunk["choices"][0].get("delta", {}) content = delta.get("content", "") if content: yield content except (json.JSONDecodeError, KeyError, IndexError): continue return # success, stop retrying except urllib.error.HTTPError as e: if e.code == 429 and attempt < self.MAX_RETRIES - 1: delay = self.RETRY_BASE_DELAY * (2 ** attempt) _logger.warning(f"Rate limited (429), retrying in {delay:.0f}s (attempt {attempt + 1}/{self.MAX_RETRIES})") time.sleep(delay) continue _logger.error("OpenAI stream error", error=str(e)) yield f"\n[Error] {e}" return except Exception as e: _logger.error("OpenAI stream error", error=str(e)) yield f"\n[Error] {e}" return
[docs] def is_available(self) -> bool: return bool(self.api_key)
[docs] def model_name(self) -> str: return self.model
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _request(self, endpoint: str, body: dict) -> dict: url = f"{self.base_url}{endpoint}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", } req = urllib.request.Request( url, data=json.dumps(body).encode("utf-8"), headers=headers, method="POST", ) with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read().decode("utf-8"))