Source code for core.logger

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from threading import RLock
from typing import Any, Callable
import sys


[docs] @dataclass(frozen=True) class LogRecord: timestamp: str level: str level_value: int subsystem: str message: str data: dict[str, Any]
[docs] class LogLevels: DEBUG = 10 INFO = 20 WARNING = 30 ERROR = 40 _name_to_value = { "DEBUG": DEBUG, "INFO": INFO, "WARNING": WARNING, "ERROR": ERROR } _value_to_name = { DEBUG: "DEBUG", INFO: "INFO", WARNING: "WARNING", ERROR: "ERROR" }
[docs] @classmethod def parse(cls, level: int | str): if isinstance(level, str): return cls._name_to_value.get(level.upper(), cls.INFO) return int(level)
[docs] @classmethod def name(cls, level: int): return cls._value_to_name.get(int(level), "INFO")
_lock = RLock() _sinks: list[Callable[[LogRecord], None]] = [] _min_level = LogLevels.INFO def _default_sink(record: LogRecord): if record.data: msg = f"[{record.timestamp}] [{record.level}] [{record.subsystem}] {record.message} | {record.data}" print(msg) else: msg = f"[{record.timestamp}] [{record.level}] [{record.subsystem}] {record.message}" print(msg) if sys.stdout is not None: sys.stdout.flush()
[docs] def add_sink(sink: Callable[[LogRecord], None]): with _lock: if sink not in _sinks: _sinks.append(sink)
[docs] def remove_sink(sink: Callable[[LogRecord], None]): with _lock: if sink in _sinks: _sinks.remove(sink)
[docs] def set_min_level(level: int | str): global _min_level _min_level = LogLevels.parse(level)
[docs] def get_min_level(): return _min_level
[docs] def emit(level: int | str, subsystem: str, message: str, **data): level_value = LogLevels.parse(level) if level_value < _min_level: return record = LogRecord( timestamp=datetime.utcnow().isoformat(timespec="milliseconds") + "Z", level=LogLevels.name(level_value), level_value=level_value, subsystem=subsystem or "engine", message=str(message), data=data if data else {} ) with _lock: active_sinks = list(_sinks) if not active_sinks: _default_sink(record) return for sink in active_sinks: try: sink(record) except Exception: _default_sink(record)
[docs] class Logger: def __init__(self, subsystem: str): self.subsystem = subsystem
[docs] def debug(self, message: str, **data): emit(LogLevels.DEBUG, self.subsystem, message, **data)
[docs] def info(self, message: str, **data): emit(LogLevels.INFO, self.subsystem, message, **data)
[docs] def warning(self, message: str, **data): emit(LogLevels.WARNING, self.subsystem, message, **data)
[docs] def error(self, message: str, **data): emit(LogLevels.ERROR, self.subsystem, message, **data)
[docs] def get_logger(subsystem: str): return Logger(subsystem)