# src/pdfbeaver/api.py
"""
Public API for the generic PDF Stream Editor.
"""
# src/pdfbeaver/api.py
import logging
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
import pikepdf
from pdfminer.pdfdevice import PDFDevice
from pdfminer.pdfinterp import PDFResourceManager
from pdfminer.pdftypes import PDFStream
from pdfminer.psparser import LIT
from .editor import StreamEditor
from .optimization import optimize_ops
# Import the default registry from the sibling module
from .registry import HandlerRegistry, default_registry
from .state_iterator import StreamStateIterator
from .state_tracker import StateTracker
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ProcessingOptions:
"""Configuration options for the stream modification process."""
optimize: bool = True
"""If True, runs a peephole optimizer on the output stream to
remove dead stores and consolidate arithmetic (e.g., combining
absolute text matrices into relative moves). Defaults to True.
"""
recurse_xobjects: bool = True
"""If True, recursively descends into and modifies Form XObjects
found in the page resources. Defaults to True.
"""
tracker_class: Type[StateTracker] = StateTracker
"""The class used to track PDF state (Graphics/Text) during
parsing. Defaults to :class:`StateTracker`. Users can subclass
this to add custom logic (e.g., font geometry tracking).
"""
tracker_args: Tuple = field(default_factory=tuple)
"""Positional arguments passed to the ``tracker_class`` constructor."""
tracker_kwargs: Dict[str, Any] = field(default_factory=dict)
"""Keyword arguments passed to the ``tracker_class`` constructor."""
visited_streams: Set[int] = field(default_factory=set)
"""Internal set used to prevent infinite recursion in malformed
PDFs with cyclic XObject references.
"""
[docs]
def modify_page(
pdf: pikepdf.Pdf,
page: pikepdf.Page,
handler: HandlerRegistry,
options: Optional[ProcessingOptions] = None,
) -> None:
"""Modifies a PDF page and (optionally) its Form XObjects in-place.
This function parses the page's content stream, tracks the graphics and text state,
and applies the user-defined logic from the ``handler`` registry.
Args:
pdf: The owning :class:`pikepdf.Pdf` document. Required to create new
stream objects when writing back modified content.
page: The :class:`pikepdf.Page` to modify.
handler: A :class:`~pdfbeaver.registry.HandlerRegistry` instance containing
the registered operator callbacks.
options: Configuration options. If ``None``, defaults are used.
Returns:
None: The page is modified in-place.
"""
if options is None:
options = ProcessingOptions()
_modify_content_container(
pdf=pdf,
page=page,
container=page,
resources=getattr(page, "Resources", {}),
handler=handler,
options=options,
)
if options.recurse_xobjects:
_process_child_resources(
pdf, page, getattr(page, "Resources", {}), handler, options
)
def _process_child_resources(
pdf: pikepdf.Pdf,
page: Optional[pikepdf.Page],
resources: Any,
handler: HandlerRegistry,
options: ProcessingOptions,
) -> None:
"""Recursively finds and modifies Form XObjects within a resource dictionary."""
if not isinstance(resources, pikepdf.Dictionary) or "/XObject" not in resources:
return
xobjects = resources["/XObject"]
for name, xobj_ref in xobjects.items():
# Dedup
try:
obj_id = xobj_ref.objgen
if obj_id in options.visited_streams:
continue
options.visited_streams.add(obj_id)
except AttributeError:
pass
subtype = xobj_ref.get("/Subtype")
if subtype != "/Form":
continue
logger.debug("Recursing into Form XObject: %s", name)
try:
_modify_content_container(
pdf=pdf,
page=page,
container=xobj_ref,
resources=xobj_ref.get("/Resources", {}),
handler=handler,
options=options,
)
# Recurse
_process_child_resources(
pdf, page, xobj_ref.get("/Resources", {}), handler, options
)
except pikepdf.PdfError as e:
logger.warning("Skipping malformed XObject %s: %s", name, e)
def _make_iterator_with_resources(resources):
rsrcmgr = PDFResourceManager()
device = PDFDevice(rsrcmgr)
iterator = StreamStateIterator(rsrcmgr, device)
miner_resources = _convert_to_pdfminer_resources(resources)
iterator.init_resources(miner_resources)
return iterator
def _modify_content_container(
resources: Any,
handler: HandlerRegistry,
options: ProcessingOptions,
pdf: Optional[pikepdf.Pdf] = None,
page: Optional[pikepdf.Page] = None,
container: Optional[pikepdf.Object] = None,
is_root: bool = False,
) -> None:
"""Core worker: modifies the content stream of a Page or XObject."""
iterator = _make_iterator_with_resources(resources)
stream_list = _get_clean_content_streams(container)
if not stream_list:
return
source_stream = iterator.execute(stream_list)
tracker = options.tracker_class(*options.tracker_args, **options.tracker_kwargs)
optimizer_func = optimize_ops if options.optimize else None
editor = StreamEditor(
source_iterator=source_stream,
handler=handler,
tracker=tracker,
optimizer=optimizer_func,
page=page,
container=container,
is_page_root=is_root,
)
new_bytes = editor.process()
# Write Back using the PDF object
# pikepdf.Stream(pdf, data) is the correct constructor for new streams
if isinstance(container, pikepdf.Page):
# Page: replace Contents
# Note: If previously an array, this replaces it with a single consolidated stream.
# This is generally fine and often preferred.
container.Contents = pdf.make_stream(new_bytes)
else:
# XObject (Stream): update data in place
container.write(new_bytes)
def _get_clean_content_streams(container: Any) -> List[Any]:
"""
Return a flat list of clean content streams from a Page, Stream, Array, or raw object.
"""
raw_contents = _resolve_raw_contents(container)
items = _normalize_to_list(raw_contents)
clean_streams: List[Any] = []
for item in items:
clean_streams.extend(_process_content_item(item))
return clean_streams
def _resolve_raw_contents(container: Any) -> Any:
"""Resolve a Page or raw object to its /Contents-compatible form."""
if isinstance(container, pikepdf.Page):
try:
return container.contents
except (AttributeError, pikepdf.PdfError):
return container.get("/Contents", [])
return container
def _normalize_to_list(raw_contents: Any) -> List[Any]:
"""Ensure contents are always returned as a list."""
if isinstance(raw_contents, (list, pikepdf.Array)):
return list(raw_contents)
return [raw_contents]
def _process_content_item(item: Any) -> List[Any]:
"""Process a single content item and return a list of clean streams."""
# Case: Raw bytes
if isinstance(item, bytes):
return [item]
# Case: Something that can read_bytes (Stream-like or Dictionary-like)
if hasattr(item, "read_bytes"):
return _handle_stream_like_item(item)
# Unknown / irrelevant item
return []
def _handle_stream_like_item(item: Any) -> List[Any]:
"""Handle Streams, Dictionaries masquerading as streams, and nested Pages."""
try:
# Optimization: skip read_bytes if this is a Page dictionary
if _is_page_dict(item):
return _extract_page_dict_contents(item)
return [item]
except pikepdf.PdfError:
return _handle_invalid_stream_like(item)
def _is_page_dict(item: Any) -> bool:
return (
isinstance(item, pikepdf.Dictionary)
and "/Type" in item
and item["/Type"] == "/Page"
)
def _extract_page_dict_contents(item: pikepdf.Dictionary) -> List[Any]:
"""Extract nested /Contents from a Page dictionary."""
if "/Contents" in item:
return _get_clean_content_streams(item["/Contents"])
return []
def _handle_invalid_stream_like(item: Any) -> List[Any]:
"""Handle Dictionary objects that fail read_bytes but contain /Contents."""
if isinstance(item, pikepdf.Dictionary) and "/Contents" in item:
return _get_clean_content_streams(item["/Contents"])
logger.warning("Skipping invalid content item (not a stream): %r", item)
return []
[docs]
def process(
pdf: pikepdf.Pdf,
options: Optional[ProcessingOptions] = None,
registry: Optional[HandlerRegistry] = None,
pages: Union[None, int, pikepdf.Page, List[Union[int, pikepdf.Page]]] = None,
page: Union[None, int, pikepdf.Page] = None,
) -> None:
"""High-level entry point to modify PDF content.
Args:
pdf: The :class:`pikepdf.Pdf` object to process.
options: Configuration options.
registry: The :class:`~pdfbeaver.registry.HandlerRegistry` to use.
Defaults to the global ``default_registry``.
pages: The pages to process. Can be a single integer (0-indexed),
a single Page object, a list of integers/Pages, or None (processes all pages).
page: Alias for ``pages`` (kept for backward compatibility).
Raises:
TypeError: If ``pdf`` is not a pikepdf object or ``pages`` contains invalid types.
"""
if not isinstance(pdf, pikepdf.Pdf):
raise TypeError("The 'pdf' argument must be a pikepdf.Pdf object.")
# Use the global default if none provided
if registry is None:
registry = default_registry
pages_to_process = _resolve_pages(pdf, pages or page)
for page_to_process in pages_to_process:
modify_page(pdf, page_to_process, registry, options)
def _resolve_pages(pdf: pikepdf.Pdf, pages_arg) -> List[pikepdf.Page]:
"""Helper to normalize the flexible 'pages' argument."""
if pages_arg is None:
return list(pdf.pages)
if isinstance(pages_arg, int):
return [pdf.pages[pages_arg]]
if isinstance(pages_arg, pikepdf.Page):
return [pages_arg]
if isinstance(pages_arg, (list, tuple)):
resolved = []
for item in pages_arg:
if isinstance(item, int):
resolved.append(pdf.pages[item])
elif isinstance(item, pikepdf.Page):
resolved.append(item)
else:
raise TypeError(f"Invalid item in 'pages' list: {type(item)}")
return resolved
raise TypeError(f"Invalid type for 'pages' argument: {type(pages_arg)}")
def _convert_to_pdfminer_resources(obj: Any, strip_slash=False) -> Any:
"""Recursively converts pikepdf resources to types pdfminer understands."""
result = obj
if isinstance(obj, pikepdf.Dictionary):
result = {
_convert_to_pdfminer_resources(
k, strip_slash=True
): _convert_to_pdfminer_resources(v)
for k, v in obj.items()
}
elif isinstance(obj, pikepdf.Array):
result = [_convert_to_pdfminer_resources(v) for v in obj]
elif isinstance(obj, pikepdf.Stream):
attrs = _convert_to_pdfminer_resources(obj.stream_dict)
# We pass the original raw (probably compressed) bytes.
# This is probably not very efficient?
# Might be better to pass the decompressed bytes (with read_bytes);
# we'd need to fix up the stream dictionary attrs in that case,
# at least removing any /Filter.
result = PDFStream(attrs, obj.read_raw_bytes())
elif isinstance(obj, (str, pikepdf.String)):
s = str(obj)
if strip_slash and s.startswith("/"):
result = s[1:]
else:
result = s
elif isinstance(obj, pikepdf.Name):
result = LIT(str(obj)[1:])
elif isinstance(obj, Decimal):
result = float(obj)
return result