summaryrefslogtreecommitdiff
path: root/searx/botdetection
diff options
context:
space:
mode:
authorMarkus Heiser <markus.heiser@darmarit.de>2025-08-22 17:17:51 +0200
committerMarkus Heiser <markus.heiser@darmarIT.de>2025-09-03 13:37:36 +0200
commit57b9673efb1b4fd18a3ac15e26da642201e2cd33 (patch)
tree79d3ecd365a1669a1109aa7e5dd3636bc1041d96 /searx/botdetection
parent09500459feffa414dc7a0601bdb164464a8b0454 (diff)
[mod] addition of various type hints / tbc
- pyright configuration [1]_ - stub files: types-lxml [2]_ - addition of various type hints - enable use of new type system features on older Python versions [3]_ - ``.tool-versions`` - set python to lowest version we support (3.10.18) [4]_: Older versions typically lack some typing features found in newer Python versions. Therefore, for local type checking (before commit), it is necessary to use the older Python interpreter. .. [1] https://docs.basedpyright.com/v1.20.0/configuration/config-files/ .. [2] https://pypi.org/project/types-lxml/ .. [3] https://typing-extensions.readthedocs.io/en/latest/# .. [4] https://mise.jdx.dev/configuration.html#tool-versions Signed-off-by: Markus Heiser <markus.heiser@darmarit.de> Format: reST
Diffstat (limited to 'searx/botdetection')
-rw-r--r--searx/botdetection/_helpers.py4
-rw-r--r--searx/botdetection/config.py65
-rw-r--r--searx/botdetection/trusted_proxies.py5
3 files changed, 39 insertions, 35 deletions
diff --git a/searx/botdetection/_helpers.py b/searx/botdetection/_helpers.py
index 72af693c1..19f5db36a 100644
--- a/searx/botdetection/_helpers.py
+++ b/searx/botdetection/_helpers.py
@@ -53,7 +53,7 @@ def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkz
return flask.make_response(('Too Many Requests', 429))
-def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network:
+def get_network(real_ip: IPv4Address | IPv6Address, cfg: "config.Config") -> IPv4Network | IPv6Network:
"""Returns the (client) network of whether the ``real_ip`` is part of.
The ``ipv4_prefix`` and ``ipv6_prefix`` define the number of leading bits in
@@ -71,7 +71,7 @@ def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4N
prefix: int = cfg["botdetection.ipv4_prefix"]
if real_ip.version == 6:
- prefix: int = cfg["botdetection.ipv6_prefix"]
+ prefix = cfg["botdetection.ipv6_prefix"]
network = ip_network(f"{real_ip}/{prefix}", strict=False)
# logger.debug("get_network(): %s", network.compressed)
return network
diff --git a/searx/botdetection/config.py b/searx/botdetection/config.py
index 6b35df84f..ad86f7884 100644
--- a/searx/botdetection/config.py
+++ b/searx/botdetection/config.py
@@ -19,26 +19,27 @@ __all__ = ['Config', 'UNSET', 'SchemaIssue', 'set_global_cfg', 'get_global_cfg']
log = logging.getLogger(__name__)
-CFG: Config | None = None
+CFG: "Config | None" = None
"""Global config of the botdetection."""
-def set_global_cfg(cfg: Config):
+def set_global_cfg(cfg: "Config"):
global CFG # pylint: disable=global-statement
CFG = cfg
-def get_global_cfg() -> Config:
+def get_global_cfg() -> "Config":
if CFG is None:
raise ValueError("Botdetection's config is not yet initialized.")
return CFG
+@typing.final
class FALSE:
"""Class of ``False`` singleton"""
# pylint: disable=multiple-statements
- def __init__(self, msg):
+ def __init__(self, msg: str):
self.msg = msg
def __bool__(self):
@@ -53,6 +54,7 @@ class FALSE:
UNSET = FALSE('<UNSET>')
+@typing.final
class SchemaIssue(ValueError):
"""Exception to store and/or raise a message from a schema issue."""
@@ -67,10 +69,10 @@ class SchemaIssue(ValueError):
class Config:
"""Base class used for configuration"""
- UNSET = UNSET
+ UNSET: object = UNSET
@classmethod
- def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict[str, str]) -> Config:
+ def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict[str, str]) -> "Config":
# init schema
@@ -102,9 +104,9 @@ class Config:
These values are needed for validation, see :py:obj:`validate`.
"""
- self.cfg_schema = cfg_schema
- self.deprecated = deprecated
- self.cfg = copy.deepcopy(cfg_schema)
+ self.cfg_schema: dict[str, typing.Any] = cfg_schema
+ self.deprecated: dict[str, str] = deprecated
+ self.cfg: dict[str, typing.Any] = copy.deepcopy(cfg_schema)
def __getitem__(self, key: str) -> typing.Any:
return self.get(key)
@@ -115,7 +117,7 @@ class Config:
return validate(self.cfg_schema, cfg, self.deprecated)
- def update(self, upd_cfg: dict):
+ def update(self, upd_cfg: dict[str, typing.Any]):
"""Update this configuration by ``upd_cfg``."""
dict_deepupdate(self.cfg, upd_cfg)
@@ -142,7 +144,7 @@ class Config:
val = val % self
return val
- def set(self, name: str, val):
+ def set(self, name: str, val: typing.Any):
"""Set the value to which ``name`` points in the configuration.
If there is no such ``name`` in the config, a :py:obj:`KeyError` is
@@ -151,17 +153,17 @@ class Config:
parent = self._get_parent_dict(name)
parent[name.split('.')[-1]] = val
- def _get_parent_dict(self, name):
+ def _get_parent_dict(self, name: str) -> dict[str, typing.Any]:
parent_name = '.'.join(name.split('.')[:-1])
if parent_name:
- parent = value(parent_name, self.cfg)
+ parent: dict[str, typing.Any] = value(parent_name, self.cfg)
else:
parent = self.cfg
if (parent is UNSET) or (not isinstance(parent, dict)):
raise KeyError(parent_name)
return parent
- def path(self, name: str, default=UNSET):
+ def path(self, name: str, default: typing.Any = UNSET):
"""Get a :py:class:`pathlib.Path` object from a config string."""
val = self.get(name, default)
@@ -171,7 +173,7 @@ class Config:
return default
return pathlib.Path(str(val))
- def pyobj(self, name, default=UNSET):
+ def pyobj(self, name: str, default: typing.Any = UNSET):
"""Get python object referred by full qualiffied name (FQN) in the config
string."""
@@ -185,7 +187,7 @@ class Config:
return getattr(m, name)
-def toml_load(file_name):
+def toml_load(file_name: str | pathlib.Path):
try:
with open(file_name, "rb") as f:
return tomllib.load(f)
@@ -198,7 +200,7 @@ def toml_load(file_name):
# working with dictionaries
-def value(name: str, data_dict: dict):
+def value(name: str, data_dict: dict[str, typing.Any]):
"""Returns the value to which ``name`` points in the ``dat_dict``.
.. code: python
@@ -228,7 +230,7 @@ def value(name: str, data_dict: dict):
def validate(
schema_dict: dict[str, typing.Any], data_dict: dict[str, typing.Any], deprecated: dict[str, str]
-) -> tuple[bool, list[str]]:
+) -> tuple[bool, list[SchemaIssue]]:
"""Deep validation of dictionary in ``data_dict`` against dictionary in
``schema_dict``. Argument deprecated is a dictionary that maps deprecated
configuration names to a messages::
@@ -254,9 +256,9 @@ def validate(
:py:obj:`SchemaIssue` is raised.
"""
- names = []
- is_valid = True
- issue_list = []
+ names: list[str] = []
+ is_valid: bool = True
+ issue_list: list[SchemaIssue] = []
if not isinstance(schema_dict, dict):
raise SchemaIssue('invalid', "schema_dict is not a dict type")
@@ -268,15 +270,16 @@ def validate(
def _validate(
- names: typing.List,
- issue_list: typing.List,
- schema_dict: typing.Dict,
- data_dict: typing.Dict,
- deprecated: typing.Dict[str, str],
-) -> typing.Tuple[bool, typing.List]:
+ names: list[str],
+ issue_list: list[SchemaIssue],
+ schema_dict: dict[str, typing.Any],
+ data_dict: dict[str, typing.Any],
+ deprecated: dict[str, str],
+) -> tuple[bool, list[SchemaIssue]]:
is_valid = True
+ data_value: dict[str, typing.Any]
for key, data_value in data_dict.items():
names.append(key)
@@ -311,7 +314,7 @@ def _validate(
return is_valid, issue_list
-def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
+def dict_deepupdate(base_dict: dict[str, typing.Any], upd_dict: dict[str, typing.Any], names: list[str] | None = None):
"""Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
@@ -350,7 +353,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
dict_deepupdate(
base_dict[upd_key],
- upd_val,
+ upd_val, # pyright: ignore[reportUnknownArgumentType]
names
+ [
upd_key,
@@ -359,7 +362,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
else:
# if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
- base_dict[upd_key] = copy.deepcopy(upd_val)
+ base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
elif isinstance(upd_val, list):
@@ -373,7 +376,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
else:
# if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
# list in upd_val.
- base_dict[upd_key] = copy.deepcopy(upd_val)
+ base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
elif isinstance(upd_val, set):
diff --git a/searx/botdetection/trusted_proxies.py b/searx/botdetection/trusted_proxies.py
index ae2945af3..39a60997b 100644
--- a/searx/botdetection/trusted_proxies.py
+++ b/searx/botdetection/trusted_proxies.py
@@ -19,6 +19,7 @@ if t.TYPE_CHECKING:
from _typeshed.wsgi import WSGIEnvironment
+@t.final
class ProxyFix:
"""A middleware like the ProxyFix_ class, where the ``x_for`` argument is
replaced by a method that determines the number of trusted proxies via the
@@ -54,7 +55,7 @@ class ProxyFix:
"""
- def __init__(self, wsgi_app: WSGIApplication) -> None:
+ def __init__(self, wsgi_app: "WSGIApplication") -> None:
self.wsgi_app = wsgi_app
def trusted_proxies(self) -> list[IPv4Network | IPv6Network]:
@@ -84,7 +85,7 @@ class ProxyFix:
# fallback to first address
return x_forwarded_for[0].compressed
- def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> abc.Iterable[bytes]:
+ def __call__(self, environ: "WSGIEnvironment", start_response: "StartResponse") -> abc.Iterable[bytes]:
# pylint: disable=too-many-statements
trusted_proxies = self.trusted_proxies()