Source code for pdfbeaver.registry

# src/pdfbeaver/registry.py
"""
A user-friendly registry for stream handlers.

Implements the StreamHandler protocol (consumed by StreamEditor)
but allows function-based registration with flexible signatures.
"""

import inspect
import logging
from typing import Any, Callable, Dict, List, Set, Union

from .editor import (
    ORIGINAL_BYTES,
    ContentStreamInstruction,
    NormalizedOperand,
    StreamContext,
)

logger = logging.getLogger(__name__)


[docs] class HandlerRegistry: """ A user-friendly registry for stream handlers. Implements the StreamHandler protocol (consumed by StreamEditor) but allows function-based registration with flexible signatures. """ def __init__(self): self._handlers: Dict[str, Callable] = {} # Helper for users to return "do nothing" (pass-through) # pylint: disable=invalid-name self.PASS_THROUGH = [ORIGINAL_BYTES] @property def modified_operators(self) -> Set[str]: """Returns the set of operators registered for interception.""" return set(self._handlers.keys())
[docs] def register(self, *ops: str): """ Decorator to register a function for specific operators. The decorated function can accept any combination of the following arguments (detected by name): * ``args`` or ``arguments`` or ``operands``: ``List[NormalizedOperand]`` * ``context``: :class:`~pdfbeaver.editor.StreamContext` * ``raw_bytes``: ``bytes`` * ``op`` or ``operator``: ``str`` * ``pdf``: ``pikepdf.Pdf`` * ``page``: ``pikepdf.Page`` Example: .. code-block:: python @registry.register("Tj", "TJ") def my_handler(operands, context): ... Args: *ops: One or more operator strings (e.g., "Tj", "Do", "re") to intercept. """ def decorator(func: Callable): sig = inspect.signature(func) params = set(sig.parameters) allowed_params = { "args", "arguments", "container", "context", "op", "operands", "operator", "page", "pdf", "raw_bytes", } if bad_param := any((x not in allowed_params for x in params)): raise ValueError( f"Parameter name {bad_param} not allowed. " f"Allowed parameter names are: {allowed_params}" ) def wrapper(operands, context, raw_bytes, op_name): kwargs_all = { "args": operands, "arguments": operands, "container": context.container if context else None, "context": context, "op": op_name, "operands": operands, "operator": op_name, "page": context.page if context else None, "pdf": context.pdf if context else None, "raw_bytes": raw_bytes, } kwargs = {k: kwargs_all[k] for k in params} result = func(**kwargs) return self._normalize_return_value(result) for op in ops: self._handlers[op] = wrapper return func return decorator
def _normalize_return_value(self, result: Any) -> List[Any]: """ Ensures the handler return value is always a list of instructions. Allows users to return single items (str, bytes, tuple) without wrapping in []. """ if result is None: return [] # If it's a list (and not a string/bytes), assume it's a list of instructions if isinstance(result, list): return result # Handle generators if isinstance(result, (map, filter)) or inspect.isgenerator(result): return list(result) # If it's a single item (str, bytes, tuple, Sentinel), wrap it return [result]
[docs] def handle_operator( self, op: str, operands: List[NormalizedOperand], context: StreamContext, raw_bytes: bytes, ) -> List[Union[ContentStreamInstruction, bytes]]: """ Standard entry point called by StreamEditor. """ handler = self._handlers.get(op) if not handler: return [raw_bytes] return handler(operands, context, raw_bytes, op)
default_registry = HandlerRegistry()