From 57b9673efb1b4fd18a3ac15e26da642201e2cd33 Mon Sep 17 00:00:00 2001 From: Markus Heiser Date: Fri, 22 Aug 2025 17:17:51 +0200 Subject: [mod] addition of various type hints / tbc - pyright configuration [1]_ - stub files: types-lxml [2]_ - addition of various type hints - enable use of new type system features on older Python versions [3]_ - ``.tool-versions`` - set python to lowest version we support (3.10.18) [4]_: Older versions typically lack some typing features found in newer Python versions. Therefore, for local type checking (before commit), it is necessary to use the older Python interpreter. .. [1] https://docs.basedpyright.com/v1.20.0/configuration/config-files/ .. [2] https://pypi.org/project/types-lxml/ .. [3] https://typing-extensions.readthedocs.io/en/latest/# .. [4] https://mise.jdx.dev/configuration.html#tool-versions Signed-off-by: Markus Heiser Format: reST --- searx/utils.py | 214 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 105 insertions(+), 109 deletions(-) (limited to 'searx/utils.py') diff --git a/searx/utils.py b/searx/utils.py index dff3eb4f4..7196f53e4 100644 --- a/searx/utils.py +++ b/searx/utils.py @@ -9,7 +9,9 @@ import importlib.util import json import types -from typing import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable +import typing as t +from collections.abc import MutableMapping, Callable + from numbers import Number from os.path import splitext, join from random import choice @@ -29,10 +31,15 @@ from searx.sxng_locales import sxng_locales from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException from searx import logger +if t.TYPE_CHECKING: + import fasttext.FastText # type: ignore + logger = logger.getChild('utils') -XPathSpecType = Union[str, XPath] +XPathSpecType: t.TypeAlias = str | XPath +"""Type alias used by :py:obj:`searx.utils.get_xpath`, +:py:obj:`searx.utils.eval_xpath` and other XPath selectors.""" _BLOCKED_TAGS = ('script', 'style') @@ -43,10 +50,10 @@ _JS_QUOTE_KEYS_RE = re.compile(r'([\{\s,])(\w+)(:)') _JS_VOID_RE = re.compile(r'void\s+[0-9]+|void\s*\([0-9]+\)') _JS_DECIMAL_RE = re.compile(r":\s*\.") -_XPATH_CACHE: Dict[str, XPath] = {} -_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {} +_XPATH_CACHE: dict[str, XPath] = {} +_LANG_TO_LC_CACHE: dict[str, dict[str, str]] = {} -_FASTTEXT_MODEL: Optional["fasttext.FastText._FastText"] = None # type: ignore +_FASTTEXT_MODEL: "fasttext.FastText._FastText | None" = None # pyright: ignore[reportPrivateUsage] """fasttext model to predict language of a search term""" SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split('-')[0] for searxng_locale in sxng_locales]) @@ -66,12 +73,15 @@ def searxng_useragent() -> str: return f"SearXNG/{VERSION_TAG} {settings['outgoing']['useragent_suffix']}".strip() -def gen_useragent(os_string: Optional[str] = None) -> str: +def gen_useragent(os_string: str | None = None) -> str: """Return a random browser User Agent See searx/data/useragents.json """ - return USER_AGENTS['ua'].format(os=os_string or choice(USER_AGENTS['os']), version=choice(USER_AGENTS['versions'])) + return USER_AGENTS['ua'].format( + os=os_string or choice(USER_AGENTS['os']), + version=choice(USER_AGENTS['versions']), + ) class HTMLTextExtractor(HTMLParser): @@ -79,15 +89,15 @@ class HTMLTextExtractor(HTMLParser): def __init__(self): HTMLParser.__init__(self) - self.result = [] - self.tags = [] + self.result: list[str] = [] + self.tags: list[str] = [] - def handle_starttag(self, tag, attrs): + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: self.tags.append(tag) if tag == 'br': self.result.append(' ') - def handle_endtag(self, tag): + def handle_endtag(self, tag: str) -> None: if not self.tags: return @@ -100,12 +110,12 @@ class HTMLTextExtractor(HTMLParser): def is_valid_tag(self): return not self.tags or self.tags[-1] not in _BLOCKED_TAGS - def handle_data(self, data): + def handle_data(self, data: str) -> None: if not self.is_valid_tag(): return self.result.append(data) - def handle_charref(self, name): + def handle_charref(self, name: str) -> None: if not self.is_valid_tag(): return if name[0] in ('x', 'X'): @@ -114,7 +124,7 @@ class HTMLTextExtractor(HTMLParser): codepoint = int(name) self.result.append(chr(codepoint)) - def handle_entityref(self, name): + def handle_entityref(self, name: str) -> None: if not self.is_valid_tag(): return # codepoint = htmlentitydefs.name2codepoint[name] @@ -124,7 +134,7 @@ class HTMLTextExtractor(HTMLParser): def get_text(self): return ''.join(self.result).strip() - def error(self, message): + def error(self, message: str) -> None: # error handle is needed in str: 'Headline' """ - html_str = ( + html_str: str = ( MarkdownIt("commonmark", {"typographer": True}).enable(["replacements", "smartquotes"]).render(markdown_str) ) return html_to_text(html_str) -def extract_text(xpath_results, allow_none: bool = False) -> Optional[str]: +def extract_text( + xpath_results: list[ElementBase] | ElementBase | str | Number | bool | None, + allow_none: bool = False, +) -> str | None: """Extract text from a lxml result * if xpath_results is list, extract the text from each result and concat the list @@ -210,9 +223,14 @@ def extract_text(xpath_results, allow_none: bool = False) -> Optional[str]: return result.strip() if isinstance(xpath_results, ElementBase): # it's a element - text: str = html.tostring(xpath_results, encoding='unicode', method='text', with_tail=False) - text = text.strip().replace('\n', ' ') - return ' '.join(text.split()) + text: str = html.tostring( # type: ignore + xpath_results, # pyright: ignore[reportArgumentType] + encoding='unicode', + method='text', + with_tail=False, + ) + text = text.strip().replace('\n', ' ') # type: ignore + return ' '.join(text.split()) # type: ignore if isinstance(xpath_results, (str, Number, bool)): return str(xpath_results) if xpath_results is None and allow_none: @@ -272,13 +290,9 @@ def normalize_url(url: str, base_url: str) -> str: return url -def extract_url(xpath_results, base_url) -> str: +def extract_url(xpath_results: list[ElementBase] | ElementBase | str | Number | bool | None, base_url: str) -> str: """Extract and normalize URL from lxml Element - Args: - * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s) - * base_url (str): Base URL - Example: >>> def f(s, search_url): >>> return searx.utils.extract_url(html.fromstring(s), search_url) @@ -313,7 +327,7 @@ def extract_url(xpath_results, base_url) -> str: raise ValueError('URL not found') -def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict: +def dict_subset(dictionary: MutableMapping[t.Any, t.Any], properties: set[str]) -> MutableMapping[str, t.Any]: """Extract a subset of a dict Examples: @@ -325,7 +339,7 @@ def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict: return {k: dictionary[k] for k in properties if k in dictionary} -def humanize_bytes(size, precision=2): +def humanize_bytes(size: int | float, precision: int = 2): """Determine the *human readable* value of bytes on 1024 base (1KB=1024B).""" s = ['B ', 'KB', 'MB', 'GB', 'TB'] @@ -337,7 +351,7 @@ def humanize_bytes(size, precision=2): return "%.*f %s" % (precision, size, s[p]) -def humanize_number(size, precision=0): +def humanize_number(size: int | float, precision: int = 0): """Determine the *human readable* value of a decimal number.""" s = ['', 'K', 'M', 'B', 'T'] @@ -385,7 +399,7 @@ def extr(txt: str, begin: str, end: str, default: str = ""): return default -def int_or_zero(num: Union[List[str], str]) -> int: +def int_or_zero(num: list[str] | str) -> int: """Convert num to int or 0. num can be either a str or a list. If num is a list, the first element is converted to int (or return 0 if the list is empty). If num is a str, see convert_str_to_int @@ -397,7 +411,7 @@ def int_or_zero(num: Union[List[str], str]) -> int: return convert_str_to_int(num) -def is_valid_lang(lang) -> Optional[Tuple[bool, str, str]]: +def is_valid_lang(lang: str) -> tuple[bool, str, str] | None: """Return language code and name if lang describe a language. Examples: @@ -443,7 +457,7 @@ def load_module(filename: str, module_dir: str) -> types.ModuleType: return module -def to_string(obj: Any) -> str: +def to_string(obj: t.Any) -> str: """Convert obj to its string representation.""" if isinstance(obj, str): return obj @@ -473,13 +487,13 @@ def ecma_unescape(string: str) -> str: return string -def remove_pua_from_str(string): +def remove_pua_from_str(string: str): """Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string. .. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas """ pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD)) - s = [] + s: list[str] = [] for c in string: i = ord(c) if any(a <= i <= b for (a, b) in pua_ranges): @@ -488,17 +502,17 @@ def remove_pua_from_str(string): return "".join(s) -def get_string_replaces_function(replaces: Dict[str, str]) -> Callable[[str], str]: +def get_string_replaces_function(replaces: dict[str, str]) -> Callable[[str], str]: rep = {re.escape(k): v for k, v in replaces.items()} pattern = re.compile("|".join(rep.keys())) - def func(text): + def func(text: str): return pattern.sub(lambda m: rep[re.escape(m.group(0))], text) return func -def get_engine_from_settings(name: str) -> Dict: +def get_engine_from_settings(name: str) -> dict[str, dict[str, str]]: """Return engine configuration from settings.yml of a given engine name""" if 'engines' not in settings: @@ -514,20 +528,14 @@ def get_engine_from_settings(name: str) -> Dict: def get_xpath(xpath_spec: XPathSpecType) -> XPath: - """Return cached compiled XPath - - There is no thread lock. - Worst case scenario, xpath_str is compiled more than one time. + """Return cached compiled :py:obj:`lxml.etree.XPath` object. - Args: - * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath - - Returns: - * result (bool, float, list, str): Results. + ``TypeError``: + Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a + :py:obj:`lxml.etree.XPath`. - Raises: - * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath - * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath + ``SearxXPathSyntaxException``: + Raised when there is a syntax error in the *XPath* selector (``str``). """ if isinstance(xpath_spec, str): result = _XPATH_CACHE.get(xpath_spec, None) @@ -542,49 +550,42 @@ def get_xpath(xpath_spec: XPathSpecType) -> XPath: if isinstance(xpath_spec, XPath): return xpath_spec - raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath') + raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath') # pyright: ignore[reportUnreachable] -def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType): - """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all. - See https://lxml.de/xpathxslt.html#xpath-return-values +def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType) -> t.Any: + """Equivalent of ``element.xpath(xpath_str)`` but compile ``xpath_str`` into + a :py:obj:`lxml.etree.XPath` object once for all. The return value of + ``xpath(..)`` is complex, read `XPath return values`_ for more details. - Args: - * element (ElementBase): [description] - * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath + .. _XPath return values: + https://lxml.de/xpathxslt.html#xpath-return-values - Returns: - * result (bool, float, list, str): Results. + ``TypeError``: + Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a + :py:obj:`lxml.etree.XPath`. - Raises: - * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath - * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath - * SearxEngineXPathException: Raise when the XPath can't be evaluated. + ``SearxXPathSyntaxException``: + Raised when there is a syntax error in the *XPath* selector (``str``). + + ``SearxEngineXPathException:`` + Raised when the XPath can't be evaluated (masked + :py:obj:`lxml.etree..XPathError`). """ - xpath = get_xpath(xpath_spec) + xpath: XPath = get_xpath(xpath_spec) try: + # https://lxml.de/xpathxslt.html#xpath-return-values return xpath(element) except XPathError as e: arg = ' '.join([str(i) for i in e.args]) raise SearxEngineXPathException(xpath_spec, arg) from e -def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] = None): - """Same as eval_xpath, check if the result is a list - - Args: - * element (ElementBase): [description] - * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath - * min_len (int, optional): [description]. Defaults to None. +def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: int | None = None) -> list[t.Any]: + """Same as :py:obj:`searx.utils.eval_xpath`, but additionally ensures the + return value is a :py:obj:`list`. The minimum length of the list is also + checked (if ``min_len`` is set).""" - Raises: - * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath - * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath - * SearxEngineXPathException: raise if the result is not a list - - Returns: - * result (bool, float, list, str): Results. - """ result = eval_xpath(element, xpath_spec) if not isinstance(result, list): raise SearxEngineXPathException(xpath_spec, 'the result is not a list') @@ -593,47 +594,42 @@ def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Op return result -def eval_xpath_getindex(elements: ElementBase, xpath_spec: XPathSpecType, index: int, default=_NOTSET): - """Call eval_xpath_list then get one element using the index parameter. - If the index does not exist, either raise an exception is default is not set, - other return the default value (can be None). +def eval_xpath_getindex( + element: ElementBase, + xpath_spec: XPathSpecType, + index: int, + default: t.Any = _NOTSET, +) -> t.Any: + """Same as :py:obj:`searx.utils.eval_xpath_list`, but returns item on + position ``index`` from the list (index starts with ``0``). - Args: - * elements (ElementBase): lxml element to apply the xpath. - * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath. - * index (int): index to get - * default (Object, optional): Defaults if index doesn't exist. - - Raises: - * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath - * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath - * SearxEngineXPathException: if the index is not found. Also see eval_xpath. - - Returns: - * result (bool, float, list, str): Results. + The exceptions known from :py:obj:`searx.utils.eval_xpath` are thrown. If a + default is specified, this is returned if an element at position ``index`` + could not be determined. """ - result = eval_xpath_list(elements, xpath_spec) + + result = eval_xpath_list(element, xpath_spec) if -len(result) <= index < len(result): return result[index] if default == _NOTSET: - # raise an SearxEngineXPathException instead of IndexError - # to record xpath_spec + # raise an SearxEngineXPathException instead of IndexError to record + # xpath_spec raise SearxEngineXPathException(xpath_spec, 'index ' + str(index) + ' not found') return default -def _get_fasttext_model() -> "fasttext.FastText._FastText": # type: ignore +def _get_fasttext_model() -> "fasttext.FastText._FastText": # pyright: ignore[reportPrivateUsage] global _FASTTEXT_MODEL # pylint: disable=global-statement if _FASTTEXT_MODEL is None: import fasttext # pylint: disable=import-outside-toplevel # Monkey patch: prevent fasttext from showing a (useless) warning when loading a model. - fasttext.FastText.eprint = lambda x: None - _FASTTEXT_MODEL = fasttext.load_model(str(data_dir / 'lid.176.ftz')) + fasttext.FastText.eprint = lambda x: None # type: ignore + _FASTTEXT_MODEL = fasttext.load_model(str(data_dir / 'lid.176.ftz')) # type: ignore return _FASTTEXT_MODEL -def get_embeded_stream_url(url): +def get_embeded_stream_url(url: str): """ Converts a standard video URL into its embed format. Supported services include Youtube, Facebook, Instagram, TikTok, Dailymotion, and Bilibili. @@ -695,7 +691,7 @@ def get_embeded_stream_url(url): return iframe_src -def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool = False) -> Optional[str]: +def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool = False) -> str | None: """Detect the language of the ``text`` parameter. :param str text: The string whose language is to be detected. @@ -756,17 +752,17 @@ def detect_language(text: str, threshold: float = 0.3, only_search_languages: bo """ if not isinstance(text, str): - raise ValueError('text must a str') - r = _get_fasttext_model().predict(text.replace('\n', ' '), k=1, threshold=threshold) - if isinstance(r, tuple) and len(r) == 2 and len(r[0]) > 0 and len(r[1]) > 0: - language = r[0][0].split('__label__')[1] + raise ValueError('text must a str') # pyright: ignore[reportUnreachable] + r = _get_fasttext_model().predict(text.replace('\n', ' '), k=1, threshold=threshold) # type: ignore + if isinstance(r, tuple) and len(r) == 2 and len(r[0]) > 0 and len(r[1]) > 0: # type: ignore + language = r[0][0].split('__label__')[1] # type: ignore if only_search_languages and language not in SEARCH_LANGUAGE_CODES: return None - return language + return language # type: ignore return None -def js_variable_to_python(js_variable): +def js_variable_to_python(js_variable: str) -> str: """Convert a javascript variable into JSON and then load the value It does not deal with all cases, but it is good enough for now. @@ -838,7 +834,7 @@ def js_variable_to_python(js_variable): # {"a": "\"12\"","b": "13"} s = s.replace("',", "\",") # load the JSON and return the result - return json.loads(s) + return json.loads(s) # pyright: ignore[reportAny] def parse_duration_string(duration_str: str) -> timedelta | None: -- cgit v1.2.3