Files
open-webui/backend/open_webui/utils/logger.py
Andrei Efanov 9e81e1dda1 feat: add LOG_FORMAT=json for structured JSON logging (#21747)
* feat: add LOG_FORMAT env var with JSON formatter for early logging

Introduce LOG_FORMAT environment variable (set to "json" to enable).
When active, logging.basicConfig() uses a JSONFormatter that outputs
single-line JSON objects with fields: ts, level, msg, caller, error,
stacktrace. This covers all log messages emitted during module imports
before Loguru's start_logger() takes over.

* feat: add JSON sink for Loguru when LOG_FORMAT=json

Add _json_sink() as a Loguru sink function that writes single-line JSON
to stdout. In start_logger(), conditionally use the JSON sink instead of
the plain-text stdout_format when LOG_FORMAT is set to "json".

* feat: suppress ASCII banner and fix alembic logging in JSON mode

- Wrap the ASCII art banner print in main.py with a LOG_FORMAT != "json"
  guard so JSON output stays machine-parseable.
- Skip alembic's fileConfig() call in migrations/env.py when
  LOG_FORMAT=json to prevent it from replacing the JSON log handlers
  installed during early startup.
2026-02-22 17:40:17 -06:00

201 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import sys
from typing import TYPE_CHECKING
from loguru import logger
from opentelemetry import trace
from open_webui.env import (
ENABLE_AUDIT_STDOUT,
ENABLE_AUDIT_LOGS_FILE,
AUDIT_LOGS_FILE_PATH,
AUDIT_LOG_FILE_ROTATION_SIZE,
AUDIT_LOG_LEVEL,
GLOBAL_LOG_LEVEL,
LOG_FORMAT,
AUDIT_UVICORN_LOGGER_NAMES,
ENABLE_OTEL,
ENABLE_OTEL_LOGS,
_LEVEL_MAP,
)
if TYPE_CHECKING:
from loguru import Message, Record
def stdout_format(record: "Record") -> str:
"""
Generates a formatted string for log records that are output to the console. This format includes a timestamp, log level, source location (module, function, and line), the log message, and any extra data (serialized as JSON).
Parameters:
record (Record): A Loguru record that contains logging details including time, level, name, function, line, message, and any extra context.
Returns:
str: A formatted log string intended for stdout.
"""
if record["extra"]:
record["extra"]["extra_json"] = json.dumps(record["extra"])
extra_format = " - {extra[extra_json]}"
else:
extra_format = ""
return (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>" + extra_format + "\n{exception}"
)
def _json_sink(message: "Message") -> None:
"""Write log records as single-line JSON to stdout.
Used as a Loguru sink when LOG_FORMAT is set to "json".
"""
record = message.record
log_entry = {
"ts": record["time"].strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"level": _LEVEL_MAP.get(record["level"].name, record["level"].name.lower()),
"msg": record["message"],
"caller": f"{record['name']}:{record['function']}:{record['line']}",
}
if record["extra"]:
log_entry["extra"] = record["extra"]
if record["exception"] is not None:
log_entry["error"] = "".join(
record["exception"].format_exception()
).rstrip()
sys.stdout.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n")
sys.stdout.flush()
class InterceptHandler(logging.Handler):
"""
Intercepts log records from Python's standard logging module
and redirects them to Loguru's logger.
"""
def emit(self, record):
"""
Called by the standard logging module for each log event.
It transforms the standard `LogRecord` into a format compatible with Loguru
and passes it to Loguru's logger.
"""
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = sys._getframe(6), 6
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).bind(
**self._get_extras()
).log(level, record.getMessage())
if ENABLE_OTEL and ENABLE_OTEL_LOGS:
from open_webui.utils.telemetry.logs import otel_handler
otel_handler.emit(record)
def _get_extras(self):
if not ENABLE_OTEL:
return {}
extras = {}
context = trace.get_current_span().get_span_context()
if context.is_valid:
extras["trace_id"] = trace.format_trace_id(context.trace_id)
extras["span_id"] = trace.format_span_id(context.span_id)
return extras
def file_format(record: "Record"):
"""
Formats audit log records into a structured JSON string for file output.
Parameters:
record (Record): A Loguru record containing extra audit data.
Returns:
str: A JSON-formatted string representing the audit data.
"""
audit_data = {
"id": record["extra"].get("id", ""),
"timestamp": int(record["time"].timestamp()),
"user": record["extra"].get("user", dict()),
"audit_level": record["extra"].get("audit_level", ""),
"verb": record["extra"].get("verb", ""),
"request_uri": record["extra"].get("request_uri", ""),
"response_status_code": record["extra"].get("response_status_code", 0),
"source_ip": record["extra"].get("source_ip", ""),
"user_agent": record["extra"].get("user_agent", ""),
"request_object": record["extra"].get("request_object", b""),
"response_object": record["extra"].get("response_object", b""),
"extra": record["extra"].get("extra", {}),
}
record["extra"]["file_extra"] = json.dumps(audit_data, default=str)
return "{extra[file_extra]}\n"
def start_logger():
"""
Initializes and configures Loguru's logger with distinct handlers:
A console (stdout) handler for general log messages (excluding those marked as auditable).
An optional file handler for audit logs if audit logging is enabled.
Additionally, this function reconfigures Pythons standard logging to route through Loguru and adjusts logging levels for Uvicorn.
Parameters:
enable_audit_logging (bool): Determines whether audit-specific log entries should be recorded to file.
"""
logger.remove()
audit_filter = lambda record: (
"auditable" not in record["extra"] if ENABLE_AUDIT_STDOUT else True
)
if LOG_FORMAT == "json":
logger.add(
_json_sink,
level=GLOBAL_LOG_LEVEL,
filter=audit_filter,
)
else:
logger.add(
sys.stdout,
level=GLOBAL_LOG_LEVEL,
format=stdout_format,
filter=audit_filter,
)
if AUDIT_LOG_LEVEL != "NONE" and ENABLE_AUDIT_LOGS_FILE:
try:
logger.add(
AUDIT_LOGS_FILE_PATH,
level="INFO",
rotation=AUDIT_LOG_FILE_ROTATION_SIZE,
compression="zip",
format=file_format,
filter=lambda record: record["extra"].get("auditable") is True,
)
except Exception as e:
logger.error(f"Failed to initialize audit log file handler: {str(e)}")
logging.basicConfig(
handlers=[InterceptHandler()], level=GLOBAL_LOG_LEVEL, force=True
)
for uvicorn_logger_name in ["uvicorn", "uvicorn.error"]:
uvicorn_logger = logging.getLogger(uvicorn_logger_name)
uvicorn_logger.setLevel(GLOBAL_LOG_LEVEL)
uvicorn_logger.handlers = []
for uvicorn_logger_name in AUDIT_UVICORN_LOGGER_NAMES:
uvicorn_logger = logging.getLogger(uvicorn_logger_name)
uvicorn_logger.setLevel(GLOBAL_LOG_LEVEL)
uvicorn_logger.handlers = [InterceptHandler()]
logger.info(f"GLOBAL_LOG_LEVEL: {GLOBAL_LOG_LEVEL}")