Source code for core.components.http_request

from core.ecs import Component
import threading
import json
import urllib.request
import urllib.error


[docs] class HTTPRequestComponent(Component): """ Lightweight single-request component. Configure in the inspector or script, fire it, and poll for the result. Usage in scripts: req = self.entity.get_component(HTTPRequestComponent) # Configure and send req.url = "https://api.example.com/data" req.method = "GET" req.send() # In on_update, check if done if req.is_done(): print(req.status_code, req.response_body) if req.ok: data = req.json() # Or send with body req.url = "https://api.example.com/submit" req.method = "POST" req.request_body = '{"key": "value"}' req.send() """ METHOD_GET = "GET" METHOD_POST = "POST" METHOD_PUT = "PUT" METHOD_DELETE = "DELETE" METHOD_PATCH = "PATCH" _VALID_METHODS = {METHOD_GET, METHOD_POST, METHOD_PUT, METHOD_DELETE, METHOD_PATCH} def __init__( self, url: str = "", method: str = "GET", request_body: str = "", content_type: str = "application/json", timeout: float = 30.0, send_on_start: bool = False, ): self.entity = None self.url = str(url or "") self.method = method.upper() if method.upper() in self._VALID_METHODS else self.METHOD_GET self.request_body = str(request_body or "") self.content_type = str(content_type or "application/json") self.timeout = max(1.0, float(timeout)) self.send_on_start = bool(send_on_start) # Custom headers (set from script) self.headers: dict = {} # Response state (read-only from scripts) self.status_code: int = 0 self.response_body: str = "" self.response_headers: dict = {} self.error: str = "" # Internal self._thread: threading.Thread | None = None self._done = False self._sending = False self._send_on_start_handled = False # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def send(self): """Fire the HTTP request in a background thread.""" if self._sending: return if not self.url: self.error = "No URL specified" self._done = True return self._done = False self._sending = True self.status_code = 0 self.response_body = "" self.response_headers = {} self.error = "" self._thread = threading.Thread(target=self._do_request, daemon=True) self._thread.start()
[docs] def is_done(self) -> bool: """Return True if the request has completed (success or error).""" return self._done
[docs] def is_sending(self) -> bool: """Return True if a request is currently in flight.""" return self._sending and not self._done
@property def ok(self) -> bool: """Return True if the last request was successful (2xx).""" return self._done and 200 <= self.status_code < 300 and not self.error
[docs] def json(self): """Parse response_body as JSON. Returns None on failure.""" try: return json.loads(self.response_body) except (json.JSONDecodeError, TypeError): return None
[docs] def reset(self): """Reset the response state for reuse.""" self.status_code = 0 self.response_body = "" self.response_headers = {} self.error = "" self._done = False self._sending = False
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _do_request(self): """Execute the HTTP request (runs in background thread).""" try: merged_headers = dict(self.headers) data_bytes = None if self.request_body and self.method in ("POST", "PUT", "PATCH"): data_bytes = self.request_body.encode("utf-8") merged_headers.setdefault("Content-Type", self.content_type) req = urllib.request.Request( self.url, data=data_bytes, headers=merged_headers, method=self.method ) try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: self.response_body = resp.read().decode("utf-8", errors="replace") self.response_headers = dict(resp.headers) self.status_code = resp.status except urllib.error.HTTPError as e: self.status_code = e.code try: self.response_body = e.read().decode("utf-8", errors="replace") except Exception: pass self.response_headers = dict(e.headers) if e.headers else {} self.error = str(e.reason) except Exception as e: self.error = str(e) finally: self._done = True self._sending = False # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def on_destroy(self): if self._thread and self._thread.is_alive(): self._thread.join(timeout=0.5) self._thread = None