2026-02-01 09:31:38 +01:00

252 lines
6.3 KiB
Python

"""
This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`.
WARNING: The *pathspec._backends.hyperscan* package is not part of the public
API. Its contents and structure are likely to change.
"""
from __future__ import annotations
from collections.abc import (
Sequence)
from typing import (
Any,
Callable, # Replaced by `collections.abc.Callable` in 3.9.2.
Optional) # Replaced by `X | None` in 3.10.
try:
import hyperscan
except ModuleNotFoundError:
hyperscan = None
from pathspec.backend import (
_Backend)
from pathspec.pattern import (
RegexPattern)
from pathspec._typing import (
override) # Added in 3.12.
from .._utils import (
enumerate_patterns)
from .base import (
hyperscan_error)
from ._base import (
HS_FLAGS,
HyperscanExprDat,
HyperscanExprDebug)
class HyperscanPsBackend(_Backend):
"""
The :class:`HyperscanPsBackend` class is the :module:`hyperscan`
implementation used by :class:`~pathspec.pathspec.PathSpec` for matching
files. The Hyperscan database uses block mode for matching files.
"""
def __init__(
self,
patterns: Sequence[RegexPattern],
*,
_debug_exprs: Optional[bool] = None,
_test_sort: Optional[Callable[[list], None]] = None,
) -> None:
"""
Initialize the :class:`HyperscanPsBackend` instance.
*patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
compiled patterns.
"""
if hyperscan is None:
raise hyperscan_error
if patterns and not isinstance(patterns[0], RegexPattern):
raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.")
use_patterns = enumerate_patterns(
patterns, filter=True, reverse=False,
)
debug_exprs = bool(_debug_exprs)
if use_patterns:
db = self._make_db()
expr_data = self._init_db(
db=db,
debug=debug_exprs,
patterns=use_patterns,
sort_ids=_test_sort,
)
else:
# WARNING: The hyperscan database cannot be initialized with zero
# patterns.
db = None
expr_data = []
self._db: Optional[hyperscan.Database] = db
"""
*_db* (:class:`hyperscan.Database`) is the Hyperscan database.
"""
self._debug_exprs = debug_exprs
"""
*_debug_exprs* (:class:`bool`) is whether to include additional debugging
information for the expressions.
"""
self._expr_data: list[HyperscanExprDat] = expr_data
"""
*_expr_data* (:class:`list`) maps expression index (:class:`int`) to
expression data (:class:`:class:`HyperscanExprDat`).
"""
self._out: tuple[Optional[bool], int] = (None, -1)
"""
*_out* (:class:`tuple`) stores the current match:
- *0* (:class:`bool` or :data:`None`) is the match include.
- *1* (:class:`int`) is the match index.
"""
self._patterns: dict[int, RegexPattern] = dict(use_patterns)
"""
*_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern
(:class:`RegexPattern`).
"""
@staticmethod
def _init_db(
db: hyperscan.Database,
debug: bool,
patterns: list[tuple[int, RegexPattern]],
sort_ids: Optional[Callable[[list[int]], None]],
) -> list[HyperscanExprDat]:
"""
Initialize the Hyperscan database from the given patterns.
*db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
*debug* (:class:`bool`) is whether to include additional debugging
information for the expressions.
*patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
contains the patterns.
*sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
the compiled expression ids. This is used during testing to ensure the order
of expressions is not accidentally relied on.
Returns a :class:`list` indexed by expression id (:class:`int`) to its data
(:class:`HyperscanExprDat`).
"""
# WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with
# zero elements.
assert patterns, patterns
# Prepare patterns.
expr_data: list[HyperscanExprDat] = []
exprs: list[bytes] = []
for pattern_index, pattern in patterns:
assert pattern.include is not None, (pattern_index, pattern)
# Encode regex.
assert isinstance(pattern, RegexPattern), pattern
regex = pattern.regex.pattern
if isinstance(regex, bytes):
regex_bytes = regex
else:
assert isinstance(regex, str), regex
regex_bytes = regex.encode('utf8')
if debug:
expr_data.append(HyperscanExprDebug(
include=pattern.include,
index=pattern_index,
is_dir_pattern=False,
regex=regex,
))
else:
expr_data.append(HyperscanExprDat(
include=pattern.include,
index=pattern_index,
is_dir_pattern=False,
))
exprs.append(regex_bytes)
# Sort expressions.
ids = list(range(len(exprs)))
if sort_ids is not None:
sort_ids(ids)
exprs = [exprs[__id] for __id in ids]
# Compile patterns.
db.compile(
expressions=exprs,
ids=ids,
elements=len(exprs),
flags=HS_FLAGS,
)
return expr_data
@override
def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
"""
Check the file against the patterns.
*file* (:class:`str`) is the normalized file path to check.
Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
or :data:`None`), and the index of the last matched pattern (:class:`int` or
:data:`None`).
"""
# NOTICE: According to benchmarking, a method callback is 20% faster than
# using a closure here.
db = self._db
if self._db is None:
# Database was not initialized because there were no patterns. Return no
# match.
return (None, None)
self._out = (None, -1)
db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
out_include, out_index = self._out
if out_index == -1:
out_index = None
return (out_include, out_index)
@staticmethod
def _make_db() -> hyperscan.Database:
"""
Create the Hyperscan database.
Returns the database (:class:`hyperscan.Database`).
"""
return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)
def __on_match(
self,
expr_id: int,
_from: int,
_to: int,
_flags: int,
_context: Any,
) -> Optional[bool]:
"""
Called on each match.
*expr_id* (:class:`int`) is the expression id (index) of the matched
pattern.
"""
# Store match.
# - WARNING: Hyperscan does not guarantee matches will be produced in order!
# Later expressions have higher priority.
expr_dat = self._expr_data[expr_id]
index = expr_dat.index
prev_index = self._out[1]
if index > prev_index:
self._out = (expr_dat.include, index)