An asynchronous, queue-based logging for Python with structured output and zero-dependency configuration.
- What is Ko-Log?
- Why Ko-Log?
- Features
- Installation
- Quick Start
- Core Concepts
- Configuration
- Handlers
- Processors and Renderers
- Advanced Usage
- Type Safety
- Error Handling
- Testing
- Philosophy
- For Contributors
- License
- Documentation
- Acknowledgements
Ko-Log is an async-first logging framework that decouples log generation from I/O. It uses a queue-based dispatch system to ensure non-blocking writes, supports both sync and async logging APIs, and provides a modular pipeline for formatting and filtering log events.
Unlike traditional loggers that block on every write, Ko-Log pushes records to an async queue and lets a background worker handle I/O. This is what makes it suitable for high-throughput async task workers, and any application where blocking on logs is unacceptable.
I experienced the following problems:
- Traditional logging blocks fully-async applications.
- Every
log.info()waits for disk I/O or network sockets. - Async code gets polluted with sync logging calls.
So, I needed a logger that:
- Is an async-first design, with non-blocking enqueues and background dispatches
- Enforces structured logging through context binding, JSON outputs, and support for custom processors
- Is configuration-driven by defining loggers, handlers, and processors in JSON/YAML
- Has flexible routing with hierarchical logger names, per-handler processors, and level-based filtering
- Supports strict typing
- ✅ Dual API - Sync (
logger.info()) and async (await logger.ainfo()) methods. - ✅ Queue-Based Dispatch - Background worker processes logs asynchronously.
- ✅ Backpressure Policies - Drop, block, or drop-oldest when queue is full.
- ✅ Structured Context - Bind key-value pairs to loggers for correlation.
- ✅ Processor Pipeline - Filter by level, add callsite info, serialize exceptions.
- ✅ Multiple Renderers - Plain text, JSON, colored console output (via Rich).
- ✅ Handler Types - File, rotating file, stream (stdout/stderr), null.
- ✅ Lifecycle Contexts - Measure execution time with
info_life(),ainfo_life(). - ✅ Type-safe Configuration - Pydantic models validate JSON/YAML at runtime.
pip install git+https://github.com/Amjko2234/ko-log.gitgit clone https://github.com/Amjko2234/ko-log.git
cd ko-log
pip install -e .import asyncio
from pathlib import Path
from ko_log import LoggerFactory, QueueManager
from ko_log.models import (
LoggingSystemConfig, LoggerConfig, QueueConfig,
HandlerConfig, HandlerType, RendererConfig, RendererType,
PlainStreamRendererConfig, AsyncStreamHandlerConfig,
)
async def main():
# 1. Configure queue manager
queue_config = QueueConfig(
max_queue_size=10_000,
backpressure_policy="block",
drain_timeout=5.0,
)
queue_manager = QueueManager(config=queue_config)
await queue_manager.start()
# 2. Configure logging system
system_config = LoggingSystemConfig(
loggers=[
LoggerConfig(
name="app",
level="INFO",
handlers=[
HandlerConfig(
type=HandlerType.STREAM,
renderer=RendererConfig(
type=RendererType.STREAM_PLAIN,
params=PlainStreamRendererConfig(
fmt="[%(asctime)s] [%(level)-8s] [%(name)s]: %(event)s",
datefmt="%Y-%m-%d %H:%M:%S",
),
),
params=AsyncStreamHandlerConfig(use_stderr=False),
),
],
),
],
)
# 3. Create logger factory
factory = LoggerFactory(
config=system_config,
queue_manager=queue_manager,
log_path=Path("./factory.log"),
)
# 4. Get logger and use it
logger = factory.get_logger("app")
# Synchronous logging
logger.info("Application started", version="1.0.0")
# Asynchronous logging
await logger.ainfo("Processing request", user_id=123)
# Context binding
bound_logger = logger.bind(service="auth", env="prod")
bound_logger.warning("Rate limit exceeded", ip="192.168.1.1")
# Lifecycle tracking
async with logger.ainfo_life("Database migration"):
await asyncio.sleep(0.5) # Simulated work
# 5. Cleanup
await queue_manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())Output:
[2024-01-01 12:00:00] [INFO ] [app]: Application started
[2024-01-01 12:00:00] [INFO ] [app]: Processing request
[2024-01-01 12:00:00] [WARNING ] [app]: Rate limit exceeded
[2024-01-01 12:00:00] [INFO ] [app]: Begin: Database migration
[2024-01-01 12:00:01] [INFO ] [app]: End (0.50): Database migration
A BoundLoggerBase instance that carries context and provides logging methods. Loggers are immutable; but, calling bind() returns a new logger with merged context.
QueueManager. The central dispatch system. It receives LogRecord objects from loggers and routes them to registered handlers. It runs a background worker that processes the queue asynchronously.
Responsible for writing formatted messages to a destination (file, stdout, stderr). Each handler has:
- A renderer as the formatter of the log events.
- A processor pipeline as the filterer/transformer of the events before rendering.
Learn more about handlers here
A callable that transforms EventDict. Examples include:
add_callsite_params: Adds filename, line number, function name.
Learn more about processors here.
Final formatting step. Converts EventDict to a string. Types include:
JSONRenderer: Structured JSON output with context.
Learn more about renderers here
LogRecord. Immutable data structure containing:
logger_name: For handler routing.event: Pre-processed log message.timestamp: Time of creation.event_dict: Complete structured data (level, context, callsite, etc.).
Ko-Log uses Pydantic models for type-safe configuration. You can define configs in Python or load from JSON/YAML.
However, I (Amjko2234) highly discourage configuring in Python, similar to the example below, as not only it immediately populates your imports (if you are strictly typed) but it will force you to either place a configuration file (.py) inside your infrastructure codebase or place a python file inside your configuration directory.
Although... The final judgement is still your call, you know your code better than I.
From JSON
{
"loggers": [
{
"name": "root",
"level": "DEBUG",
"handlers": [
{
"type": "file",
"params": {
"filename": "/var/log/app.log",
"mode": "wb",
"encoding": "utf-8",
"override_existing": false
},
"renderer": {
"type": "file_json",
"params": {
"fmt": "%(asctime)s - %(level)s - %(event)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
"indentation": 2
}
},
"processors": [
{
"type": "add_callsite_params",
"params": {
"parameters": ["filename", "lineno", "funcName"]
}
},
{
"type": "filter_by_level",
"params": {
"min_level": "INFO"
}
}
]
}
],
"context": {
"app": "myapp",
"env": "production"
}
}
],
"default_level": "INFO"
}Then load and use:
import json
from ko_log import LoggerFactory, QueueManager
# Read JSON config file
with open("config.json") as f:
config = json.load(f)
# Setup queueing manager
queue_manager = QueueManager.from_json({"max_queue_size": 10000})
await queue_manager.start()
# Create factory then logger from config
factory = LoggerFactory.from_json(
config=config,
queue_manager=queue_manager,
log_path="./factory.log",
)
logger = factory.get_logger("root")Writes to a file. Lazy-opens on first write.
Configuration through JSON
{
"type": "file",
"params": {
"filename": "/var/log/app.log",
"mode": "wb",
"encoding": "utf-8",
"override_existing": true
},
...
}Configuration through Python
HandlerConfig(
type=HandlerType.FILE,
params=AsyncFileHandlerConfig(
filename="/var/log/app.log",
mode="wb",
encoding="utf-8",
override_existing=True,
),
...
)Rotates logs based on file size or time.
Configuration through JSON
{
"type": "rotating_file",
"params": {
"filename": "/var/log/app.log",
"max_bytes": 10485760,
"backup_count": 5,
"rotation_interval": null
},
...
}Configuration through Python
HandlerConfig(
type=HandlerType.ROTATING_FILE,
params=AsyncRotatingFileHandlerConfig(
filename="/var/log/app.log",
max_bytes=10_485_760, # 10 MB
backup_count=5,
rotation_interval=None,
),
...
)Writes to stdout/stderr.
Configuration through JSON
{
"type": "stream",
"params": {
"use_stderr": false
},
...
}Configuration through Python
HandlerConfig(
type=HandlerType.STREAM,
params=AsyncStreamHandlerConfig(use_stderr=False),
...
)Discards all logs (unless a Sink is attached). Useful only for testing.
Configuration through JSON
{
"type": "null",
...
}Configuration through Python
HandlerConfig(
type=HandlerType.NULL,
...
)| Processor | Purpose |
|---|---|
add_callsite_params |
Adds filename, lineno, funcName, module, pathname |
add_context_defaults |
Merges default key-value pairs into context |
dict_tracebacks |
Converts exc_info to structured dict with traceback frames |
filter_by_level |
Drops logs below min_level (raises DropLog) |
filter_keys |
Removes specified keys from event dict |
filter_markup |
Strips Rich markup tags from messages |
| Renderer | Output |
|---|---|
PlainRenderer |
Percent-style formatted output: %(asctime)s - %(level)s - %(event)s |
JSONRenderer |
Formatted message + JSON context block |
ColoredRenderer |
ANSI-colored output via Rich (for terminals) |
In future updates, more processors and renderers will be added.
from ko_log.types import EventDict, Processor
# Configure the processor
def add_request_id(config: CustomConfig) -> Processor:
request_id = config.params.request_id
# Define the processor
def processor(event_dict: EventDict) -> EventDict:
event_dict.setdefault("request_id", request_id)
return event_dict
return processorRegister it in processor_map and use in config.
Bind new context to loggers:
logger = factory.get_logger("app")
# Bind immutable context
api_logger = logger.bind(service="api", version="v1")
api_logger.info("Request received", endpoint="/users")
# Chain bindings
request_logger = api_logger.bind(request_id="abc123")
request_logger.info("Processing", user_id=456)Explicitly state a context lifecycle start/end in logs:
# Sync
with logger.info_life("Database query", table="users") as log:
# Logs "Begin: Database query"
do_work()
log.info("Did work")
# Logs "End (0.15): Database query"
# Async
async with logger.ainfo_life("API call", endpoint="/data") as log:
await fetch_data()
await log.ainfo("Data fetched")Support for catching exception information and tracebacks:
try:
risky_operation()
except ValueError:
logger.error("Operation failed", operation="risky")
# Automatically captures sys.exc_info() if `dict_tracebacks` processor is enabledKo-Log is fully typed with Pydantic models:
from ko_log.models import LoggerConfig, HandlerConfig
from pydantic import ValidationError
try:
config = LoggerConfig(
name="test",
level="INVALID", # ❌ Validation error
)
except ValidationError as e:
print(e)All public APIs have type hints. Use mypy or pyright for static analysis:
mypy your_app.py
pyright your_app.pyKo-Log defines custom exceptions with structured error code for diagnostics and monitoring. All exceptions inherit _BaseException and include machine-readable error codes.
| Exception | Raised When | Common Causes |
|---|---|---|
AlConfigurationError |
Configuration validation fails | Missing logger in config, invalid enum values, malformed JSON |
AlLoggerCreationError |
Logger instantiation fails | Handler creation error, processor registration failure, invalid context |
AlLoggerError |
Runtime logging operation fails | Processor chain error, invalid event dict, serialization failure |
AlHandlerError |
Handler I/O operations fail | File not writable, disk full, network timeout, permission denied |
AlProcessorError |
Processor execution fails | Unknown processor type, invalid transformation, missing required field |
AlQueueManagerError |
Queue dispatch or routing fails | Handler not registered, dispatch timeout, worker crash |
LAYER::Service::CATEGORY::SEVERITY[::RECOVERABLE]Example: HANDLER::LoggerFactory::CONFIGURATION::ERROR
- LAYER: Where the error occurred (i.e.,
PROCESSOR,HANDLER) - Service: Component name (i.e.,
LoggerFactory,QueueManager) - CATEGORY: Error type (i.e.,
IO,CONFIGURATION) - SEVERITY:
WARNING,ERROR, orCRITICAL - RECOVERABLE (optional): Indicates if operation can be retried
try:
logger = factory.get_logger("non_existent")
except AlConfigurationError as e:
print(e.msg) # "Logger `non_existent` not found"
print(e.code) # "CONFIGURATION::LoggerFactory::VALIDATION::ERROR"try:
handler = AsyncFileHandler(
renderer=renderer,
processors=[],
filename="/read-only/path/app.log",
mode="wb",
encoding="utf-8",
override_existing=True,
)
handler._write_sync("Test message")
except AlHandlerError as e:
print(e.msg) # "Failed to open the file at path `/temp/read-only.log`"
print(e.code) # "HANDLER::AsyncFileHandler::IO::ERROR"
# Log to fallback handler, send alert, etc.try:
factory.get_logger_from_json({
"name": "app",
"processors": [{"type": "unknown_processor"}]
})
except AlLoggerCreationError as e:
print(e.msg) # "Failed to create logger `root`"
print(e.__cause__) # AlProcessorError: Unknown processor type: unknown_processortry:
queue_manager.push_sync(record)
except AlQueueManagerError as e:
print(e.msg) # "Failed to synchronously emit log message of logger `root` to handlers `[<$AsyncNullHandler>]`"
print(e.code) # "DISPATCH::QueueManager::ROUTING::ERROR"All Ko-Log exceptions expose:
exception.msg # Human-readable message
exception.msg_code # Message + error code
exception.code # Machine-readable error code
exception.user_msg # Optional user-facing message
exception.recoverable # Boolean: can retry?
exception.__cause__ # Underlying exception (if any)Exceptions can carry structured context for debugging:
try:
processor(event_dict)
except AlProcessorError as e:
print(repr(e)) # Shows JSON context
# "PROCESSOR::ProcessorName::FORMATTING::ERROR:
# {
# "event_dict": {...},
# "processor_type": "filter_by_level",
# "timestamp": "2024-01-01T12:00:00Z"
# }"-
Catch specific exceptions at API boundaries:
try: logger = factory.get_logger("app") except AlConfigurationError: # Use fallback logger logger = factory.get_logger("root")
-
Log exceptions with context:
try: risky_operation() except AlHandlerError as e: fallback_logger.error( "Handler failed", error_code=e.code, path=e._ctx.get("path") )
-
Use error codes for monitoring:
except AlQueueManagerError as e: if "CRITICAL" in e.code: send_alert(e.msg_code)
-
Check recoverability:
except AlHandlerError as e: if e.recoverable: retry_with_backoff() else: switch_to_fallback_handler()
Ko-Log provides a Sink mechanism for capturing output in tests:
import pytest
from collections.abc import Generator
from ko_log import Sink, QueueManager, LoggerFactory
@pytest.fixture
def logger_with_sink(
queue_manager: QueueManager, factory: LoggerFactory
) -> Generator[tuple[LoggerFactory, Sink], None]:
# Get a logger and attach the `Sink` into it's handler
logger = factory.get_logger("test")
sink = Sink()
queue_manager.add_sink(logger_name="test", sink=sink)
yield logger, sink
# Necessary if it is intended to use the handler's I/O
queue_manager.remove_sink(logger_name="test")
def test_logging(logger_with_sink: tuple[LoggerFactory, Sink]):
logger, sink = logger_with_sink
logger.info("Test message")
# `Sink` captures all outputs
assert len(sink.events) == 1
assert "Test message" in sink.events[0]This is a personal tool built for specific use. It follows these simple principles:
- Async by Default - Logging should never block your application
- Configuration Over Code - Define logging behavior in JSON/YAML, not Python
- Structured Output - Every log is a structured event, not just a string
- Processor Pipelines - Composable transformations over monolithic formatters
- Type Safety First - Validate configs at runtime with Pydantic, catch errors early
- Pythonic Patterns - context managers, protocols, type hints
While this is primarily a personal project, I'm open to:
- Bug reports with reproducible examples
- Documentation improvements
- Performance optimizations
- Feature requests (with clear use cases)
If you find it useful, that's reward enough. ✨
git clone https://github.com/Amjko2234/ko-log.git
cd ko-log
python -m venv .venv
source .venv/bin/active # or .venv/Scripts/Activate on Windows
pip install -e ".[dev]"pytest tests/ -vv- Formatter:
black,isort - Linter:
ruff - Type Checker:
basedpyright
MIT License. See LICENSE for details.
- How to Create Custom Processors
- How to Create Custom Renderers
- How to Create Custom Handlers
- How to Extend Log Records
- Testing Guide
- Architecture Overview
Creation of Ko-Log was inspired by:
- structlog: Structured logging with processor pipelines
- loguru: Simplicity and async support
- Python's asyncio and logging stdlib modules
Built for practical use, shared in the case it helps others build better software. 🙂