summaryrefslogtreecommitdiff
path: root/searx/utils.py
diff options
context:
space:
mode:
authorMarkus Heiser <markus.heiser@darmarit.de>2025-08-22 17:17:51 +0200
committerMarkus Heiser <markus.heiser@darmarIT.de>2025-09-03 13:37:36 +0200
commit57b9673efb1b4fd18a3ac15e26da642201e2cd33 (patch)
tree79d3ecd365a1669a1109aa7e5dd3636bc1041d96 /searx/utils.py
parent09500459feffa414dc7a0601bdb164464a8b0454 (diff)
[mod] addition of various type hints / tbc
- pyright configuration [1]_ - stub files: types-lxml [2]_ - addition of various type hints - enable use of new type system features on older Python versions [3]_ - ``.tool-versions`` - set python to lowest version we support (3.10.18) [4]_: Older versions typically lack some typing features found in newer Python versions. Therefore, for local type checking (before commit), it is necessary to use the older Python interpreter. .. [1] https://docs.basedpyright.com/v1.20.0/configuration/config-files/ .. [2] https://pypi.org/project/types-lxml/ .. [3] https://typing-extensions.readthedocs.io/en/latest/# .. [4] https://mise.jdx.dev/configuration.html#tool-versions Signed-off-by: Markus Heiser <markus.heiser@darmarit.de> Format: reST
Diffstat (limited to 'searx/utils.py')
-rw-r--r--searx/utils.py214
1 files changed, 105 insertions, 109 deletions
diff --git a/searx/utils.py b/searx/utils.py
index dff3eb4f4..7196f53e4 100644
--- a/searx/utils.py
+++ b/searx/utils.py
@@ -9,7 +9,9 @@ import importlib.util
import json
import types
-from typing import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable
+import typing as t
+from collections.abc import MutableMapping, Callable
+
from numbers import Number
from os.path import splitext, join
from random import choice
@@ -29,10 +31,15 @@ from searx.sxng_locales import sxng_locales
from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
from searx import logger
+if t.TYPE_CHECKING:
+ import fasttext.FastText # type: ignore
+
logger = logger.getChild('utils')
-XPathSpecType = Union[str, XPath]
+XPathSpecType: t.TypeAlias = str | XPath
+"""Type alias used by :py:obj:`searx.utils.get_xpath`,
+:py:obj:`searx.utils.eval_xpath` and other XPath selectors."""
_BLOCKED_TAGS = ('script', 'style')
@@ -43,10 +50,10 @@ _JS_QUOTE_KEYS_RE = re.compile(r'([\{\s,])(\w+)(:)')
_JS_VOID_RE = re.compile(r'void\s+[0-9]+|void\s*\([0-9]+\)')
_JS_DECIMAL_RE = re.compile(r":\s*\.")
-_XPATH_CACHE: Dict[str, XPath] = {}
-_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {}
+_XPATH_CACHE: dict[str, XPath] = {}
+_LANG_TO_LC_CACHE: dict[str, dict[str, str]] = {}
-_FASTTEXT_MODEL: Optional["fasttext.FastText._FastText"] = None # type: ignore
+_FASTTEXT_MODEL: "fasttext.FastText._FastText | None" = None # pyright: ignore[reportPrivateUsage]
"""fasttext model to predict language of a search term"""
SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split('-')[0] for searxng_locale in sxng_locales])
@@ -66,12 +73,15 @@ def searxng_useragent() -> str:
return f"SearXNG/{VERSION_TAG} {settings['outgoing']['useragent_suffix']}".strip()
-def gen_useragent(os_string: Optional[str] = None) -> str:
+def gen_useragent(os_string: str | None = None) -> str:
"""Return a random browser User Agent
See searx/data/useragents.json
"""
- return USER_AGENTS['ua'].format(os=os_string or choice(USER_AGENTS['os']), version=choice(USER_AGENTS['versions']))
+ return USER_AGENTS['ua'].format(
+ os=os_string or choice(USER_AGENTS['os']),
+ version=choice(USER_AGENTS['versions']),
+ )
class HTMLTextExtractor(HTMLParser):
@@ -79,15 +89,15 @@ class HTMLTextExtractor(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
- self.result = []
- self.tags = []
+ self.result: list[str] = []
+ self.tags: list[str] = []
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.tags.append(tag)
if tag == 'br':
self.result.append(' ')
- def handle_endtag(self, tag):
+ def handle_endtag(self, tag: str) -> None:
if not self.tags:
return
@@ -100,12 +110,12 @@ class HTMLTextExtractor(HTMLParser):
def is_valid_tag(self):
return not self.tags or self.tags[-1] not in _BLOCKED_TAGS
- def handle_data(self, data):
+ def handle_data(self, data: str) -> None:
if not self.is_valid_tag():
return
self.result.append(data)
- def handle_charref(self, name):
+ def handle_charref(self, name: str) -> None:
if not self.is_valid_tag():
return
if name[0] in ('x', 'X'):
@@ -114,7 +124,7 @@ class HTMLTextExtractor(HTMLParser):
codepoint = int(name)
self.result.append(chr(codepoint))
- def handle_entityref(self, name):
+ def handle_entityref(self, name: str) -> None:
if not self.is_valid_tag():
return
# codepoint = htmlentitydefs.name2codepoint[name]
@@ -124,7 +134,7 @@ class HTMLTextExtractor(HTMLParser):
def get_text(self):
return ''.join(self.result).strip()
- def error(self, message):
+ def error(self, message: str) -> None:
# error handle is needed in <py3.10
# https://github.com/python/cpython/pull/8562/files
raise AssertionError(message)
@@ -188,13 +198,16 @@ def markdown_to_text(markdown_str: str) -> str:
'Headline'
"""
- html_str = (
+ html_str: str = (
MarkdownIt("commonmark", {"typographer": True}).enable(["replacements", "smartquotes"]).render(markdown_str)
)
return html_to_text(html_str)
-def extract_text(xpath_results, allow_none: bool = False) -> Optional[str]:
+def extract_text(
+ xpath_results: list[ElementBase] | ElementBase | str | Number | bool | None,
+ allow_none: bool = False,
+) -> str | None:
"""Extract text from a lxml result
* if xpath_results is list, extract the text from each result and concat the list
@@ -210,9 +223,14 @@ def extract_text(xpath_results, allow_none: bool = False) -> Optional[str]:
return result.strip()
if isinstance(xpath_results, ElementBase):
# it's a element
- text: str = html.tostring(xpath_results, encoding='unicode', method='text', with_tail=False)
- text = text.strip().replace('\n', ' ')
- return ' '.join(text.split())
+ text: str = html.tostring( # type: ignore
+ xpath_results, # pyright: ignore[reportArgumentType]
+ encoding='unicode',
+ method='text',
+ with_tail=False,
+ )
+ text = text.strip().replace('\n', ' ') # type: ignore
+ return ' '.join(text.split()) # type: ignore
if isinstance(xpath_results, (str, Number, bool)):
return str(xpath_results)
if xpath_results is None and allow_none:
@@ -272,13 +290,9 @@ def normalize_url(url: str, base_url: str) -> str:
return url
-def extract_url(xpath_results, base_url) -> str:
+def extract_url(xpath_results: list[ElementBase] | ElementBase | str | Number | bool | None, base_url: str) -> str:
"""Extract and normalize URL from lxml Element
- Args:
- * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s)
- * base_url (str): Base URL
-
Example:
>>> def f(s, search_url):
>>> return searx.utils.extract_url(html.fromstring(s), search_url)
@@ -313,7 +327,7 @@ def extract_url(xpath_results, base_url) -> str:
raise ValueError('URL not found')
-def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
+def dict_subset(dictionary: MutableMapping[t.Any, t.Any], properties: set[str]) -> MutableMapping[str, t.Any]:
"""Extract a subset of a dict
Examples:
@@ -325,7 +339,7 @@ def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
return {k: dictionary[k] for k in properties if k in dictionary}
-def humanize_bytes(size, precision=2):
+def humanize_bytes(size: int | float, precision: int = 2):
"""Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
s = ['B ', 'KB', 'MB', 'GB', 'TB']
@@ -337,7 +351,7 @@ def humanize_bytes(size, precision=2):
return "%.*f %s" % (precision, size, s[p])
-def humanize_number(size, precision=0):
+def humanize_number(size: int | float, precision: int = 0):
"""Determine the *human readable* value of a decimal number."""
s = ['', 'K', 'M', 'B', 'T']
@@ -385,7 +399,7 @@ def extr(txt: str, begin: str, end: str, default: str = ""):
return default
-def int_or_zero(num: Union[List[str], str]) -> int:
+def int_or_zero(num: list[str] | str) -> int:
"""Convert num to int or 0. num can be either a str or a list.
If num is a list, the first element is converted to int (or return 0 if the list is empty).
If num is a str, see convert_str_to_int
@@ -397,7 +411,7 @@ def int_or_zero(num: Union[List[str], str]) -> int:
return convert_str_to_int(num)
-def is_valid_lang(lang) -> Optional[Tuple[bool, str, str]]:
+def is_valid_lang(lang: str) -> tuple[bool, str, str] | None:
"""Return language code and name if lang describe a language.
Examples:
@@ -443,7 +457,7 @@ def load_module(filename: str, module_dir: str) -> types.ModuleType:
return module
-def to_string(obj: Any) -> str:
+def to_string(obj: t.Any) -> str:
"""Convert obj to its string representation."""
if isinstance(obj, str):
return obj
@@ -473,13 +487,13 @@ def ecma_unescape(string: str) -> str:
return string
-def remove_pua_from_str(string):
+def remove_pua_from_str(string: str):
"""Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string.
.. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas
"""
pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD))
- s = []
+ s: list[str] = []
for c in string:
i = ord(c)
if any(a <= i <= b for (a, b) in pua_ranges):
@@ -488,17 +502,17 @@ def remove_pua_from_str(string):
return "".join(s)
-def get_string_replaces_function(replaces: Dict[str, str]) -> Callable[[str], str]:
+def get_string_replaces_function(replaces: dict[str, str]) -> Callable[[str], str]:
rep = {re.escape(k): v for k, v in replaces.items()}
pattern = re.compile("|".join(rep.keys()))
- def func(text):
+ def func(text: str):
return pattern.sub(lambda m: rep[re.escape(m.group(0))], text)
return func
-def get_engine_from_settings(name: str) -> Dict:
+def get_engine_from_settings(name: str) -> dict[str, dict[str, str]]:
"""Return engine configuration from settings.yml of a given engine name"""
if 'engines' not in settings:
@@ -514,20 +528,14 @@ def get_engine_from_settings(name: str) -> Dict:
def get_xpath(xpath_spec: XPathSpecType) -> XPath:
- """Return cached compiled XPath
-
- There is no thread lock.
- Worst case scenario, xpath_str is compiled more than one time.
+ """Return cached compiled :py:obj:`lxml.etree.XPath` object.
- Args:
- * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
-
- Returns:
- * result (bool, float, list, str): Results.
+ ``TypeError``:
+ Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a
+ :py:obj:`lxml.etree.XPath`.
- Raises:
- * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
- * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
+ ``SearxXPathSyntaxException``:
+ Raised when there is a syntax error in the *XPath* selector (``str``).
"""
if isinstance(xpath_spec, str):
result = _XPATH_CACHE.get(xpath_spec, None)
@@ -542,49 +550,42 @@ def get_xpath(xpath_spec: XPathSpecType) -> XPath:
if isinstance(xpath_spec, XPath):
return xpath_spec
- raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath')
+ raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath') # pyright: ignore[reportUnreachable]
-def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType):
- """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
- See https://lxml.de/xpathxslt.html#xpath-return-values
+def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType) -> t.Any:
+ """Equivalent of ``element.xpath(xpath_str)`` but compile ``xpath_str`` into
+ a :py:obj:`lxml.etree.XPath` object once for all. The return value of
+ ``xpath(..)`` is complex, read `XPath return values`_ for more details.
- Args:
- * element (ElementBase): [description]
- * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
+ .. _XPath return values:
+ https://lxml.de/xpathxslt.html#xpath-return-values
- Returns:
- * result (bool, float, list, str): Results.
+ ``TypeError``:
+ Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a
+ :py:obj:`lxml.etree.XPath`.
- Raises:
- * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
- * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
- * SearxEngineXPathException: Raise when the XPath can't be evaluated.
+ ``SearxXPathSyntaxException``:
+ Raised when there is a syntax error in the *XPath* selector (``str``).
+
+ ``SearxEngineXPathException:``
+ Raised when the XPath can't be evaluated (masked
+ :py:obj:`lxml.etree..XPathError`).
"""
- xpath = get_xpath(xpath_spec)
+ xpath: XPath = get_xpath(xpath_spec)
try:
+ # https://lxml.de/xpathxslt.html#xpath-return-values
return xpath(element)
except XPathError as e:
arg = ' '.join([str(i) for i in e.args])
raise SearxEngineXPathException(xpath_spec, arg) from e
-def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] = None):
- """Same as eval_xpath, check if the result is a list
-
- Args:
- * element (ElementBase): [description]
- * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
- * min_len (int, optional): [description]. Defaults to None.
+def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: int | None = None) -> list[t.Any]:
+ """Same as :py:obj:`searx.utils.eval_xpath`, but additionally ensures the
+ return value is a :py:obj:`list`. The minimum length of the list is also
+ checked (if ``min_len`` is set)."""
- Raises:
- * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
- * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
- * SearxEngineXPathException: raise if the result is not a list
-
- Returns:
- * result (bool, float, list, str): Results.
- """
result = eval_xpath(element, xpath_spec)
if not isinstance(result, list):
raise SearxEngineXPathException(xpath_spec, 'the result is not a list')
@@ -593,47 +594,42 @@ def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Op
return result
-def eval_xpath_getindex(elements: ElementBase, xpath_spec: XPathSpecType, index: int, default=_NOTSET):
- """Call eval_xpath_list then get one element using the index parameter.
- If the index does not exist, either raise an exception is default is not set,
- other return the default value (can be None).
+def eval_xpath_getindex(
+ element: ElementBase,
+ xpath_spec: XPathSpecType,
+ index: int,
+ default: t.Any = _NOTSET,
+) -> t.Any:
+ """Same as :py:obj:`searx.utils.eval_xpath_list`, but returns item on
+ position ``index`` from the list (index starts with ``0``).
- Args:
- * elements (ElementBase): lxml element to apply the xpath.
- * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
- * index (int): index to get
- * default (Object, optional): Defaults if index doesn't exist.
-
- Raises:
- * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
- * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
- * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
-
- Returns:
- * result (bool, float, list, str): Results.
+ The exceptions known from :py:obj:`searx.utils.eval_xpath` are thrown. If a
+ default is specified, this is returned if an element at position ``index``
+ could not be determined.
"""
- result = eval_xpath_list(elements, xpath_spec)
+
+ result = eval_xpath_list(element, xpath_spec)
if -len(result) <= index < len(result):
return result[index]
if default == _NOTSET:
- # raise an SearxEngineXPathException instead of IndexError
- # to record xpath_spec
+ # raise an SearxEngineXPathException instead of IndexError to record
+ # xpath_spec
raise SearxEngineXPathException(xpath_spec, 'index ' + str(index) + ' not found')
return default
-def _get_fasttext_model() -> "fasttext.FastText._FastText": # type: ignore
+def _get_fasttext_model() -> "fasttext.FastText._FastText": # pyright: ignore[reportPrivateUsage]
global _FASTTEXT_MODEL # pylint: disable=global-statement
if _FASTTEXT_MODEL is None:
import fasttext # pylint: disable=import-outside-toplevel
# Monkey patch: prevent fasttext from showing a (useless) warning when loading a model.
- fasttext.FastText.eprint = lambda x: None
- _FASTTEXT_MODEL = fasttext.load_model(str(data_dir / 'lid.176.ftz'))
+ fasttext.FastText.eprint = lambda x: None # type: ignore
+ _FASTTEXT_MODEL = fasttext.load_model(str(data_dir / 'lid.176.ftz')) # type: ignore
return _FASTTEXT_MODEL
-def get_embeded_stream_url(url):
+def get_embeded_stream_url(url: str):
"""
Converts a standard video URL into its embed format. Supported services include Youtube,
Facebook, Instagram, TikTok, Dailymotion, and Bilibili.
@@ -695,7 +691,7 @@ def get_embeded_stream_url(url):
return iframe_src
-def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool = False) -> Optional[str]:
+def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool = False) -> str | None:
"""Detect the language of the ``text`` parameter.
:param str text: The string whose language is to be detected.
@@ -756,17 +752,17 @@ def detect_language(text: str, threshold: float = 0.3, only_search_languages: bo
"""
if not isinstance(text, str):
- raise ValueError('text must a str')
- r = _get_fasttext_model().predict(text.replace('\n', ' '), k=1, threshold=threshold)
- if isinstance(r, tuple) and len(r) == 2 and len(r[0]) > 0 and len(r[1]) > 0:
- language = r[0][0].split('__label__')[1]
+ raise ValueError('text must a str') # pyright: ignore[reportUnreachable]
+ r = _get_fasttext_model().predict(text.replace('\n', ' '), k=1, threshold=threshold) # type: ignore
+ if isinstance(r, tuple) and len(r) == 2 and len(r[0]) > 0 and len(r[1]) > 0: # type: ignore
+ language = r[0][0].split('__label__')[1] # type: ignore
if only_search_languages and language not in SEARCH_LANGUAGE_CODES:
return None
- return language
+ return language # type: ignore
return None
-def js_variable_to_python(js_variable):
+def js_variable_to_python(js_variable: str) -> str:
"""Convert a javascript variable into JSON and then load the value
It does not deal with all cases, but it is good enough for now.
@@ -838,7 +834,7 @@ def js_variable_to_python(js_variable):
# {"a": "\"12\"","b": "13"}
s = s.replace("',", "\",")
# load the JSON and return the result
- return json.loads(s)
+ return json.loads(s) # pyright: ignore[reportAny]
def parse_duration_string(duration_str: str) -> timedelta | None: