diff options
| author | Markus Heiser <markus.heiser@darmarit.de> | 2025-08-22 17:17:51 +0200 |
|---|---|---|
| committer | Markus Heiser <markus.heiser@darmarIT.de> | 2025-09-03 13:37:36 +0200 |
| commit | 57b9673efb1b4fd18a3ac15e26da642201e2cd33 (patch) | |
| tree | 79d3ecd365a1669a1109aa7e5dd3636bc1041d96 /searx/network/__init__.py | |
| parent | 09500459feffa414dc7a0601bdb164464a8b0454 (diff) | |
[mod] addition of various type hints / tbc
- pyright configuration [1]_
- stub files: types-lxml [2]_
- addition of various type hints
- enable use of new type system features on older Python versions [3]_
- ``.tool-versions`` - set python to lowest version we support (3.10.18) [4]_:
Older versions typically lack some typing features found in newer Python
versions. Therefore, for local type checking (before commit), it is necessary
to use the older Python interpreter.
.. [1] https://docs.basedpyright.com/v1.20.0/configuration/config-files/
.. [2] https://pypi.org/project/types-lxml/
.. [3] https://typing-extensions.readthedocs.io/en/latest/#
.. [4] https://mise.jdx.dev/configuration.html#tool-versions
Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
Format: reST
Diffstat (limited to 'searx/network/__init__.py')
| -rw-r--r-- | searx/network/__init__.py | 58 |
1 files changed, 33 insertions, 25 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 6230b9e39..070388d2e 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -1,13 +1,17 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring, global-statement +__all__ = ["initialize", "check_network_configuration", "raise_for_httperror"] + +import typing as t + import asyncio import threading import concurrent.futures from queue import SimpleQueue from types import MethodType from timeit import default_timer -from typing import Iterable, NamedTuple, Tuple, List, Dict, Union +from collections.abc import Iterable from contextlib import contextmanager import httpx @@ -32,12 +36,12 @@ def get_time_for_thread(): return THREADLOCAL.__dict__.get('total_time') -def set_timeout_for_thread(timeout, start_time=None): +def set_timeout_for_thread(timeout: float, start_time: float | None = None): THREADLOCAL.timeout = timeout THREADLOCAL.start_time = start_time -def set_context_network_name(network_name): +def set_context_network_name(network_name: str): THREADLOCAL.network = get_network(network_name) @@ -64,9 +68,10 @@ def _record_http_time(): THREADLOCAL.total_time += time_after_request - time_before_request -def _get_timeout(start_time, kwargs): +def _get_timeout(start_time: float, kwargs): # pylint: disable=too-many-branches + timeout: float | None # timeout (httpx) if 'timeout' in kwargs: timeout = kwargs['timeout'] @@ -91,14 +96,17 @@ def request(method, url, **kwargs) -> SXNG_Response: with _record_http_time() as start_time: network = get_context_network() timeout = _get_timeout(start_time, kwargs) - future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop()) + future = asyncio.run_coroutine_threadsafe( + network.request(method, url, **kwargs), + get_loop(), + ) try: return future.result(timeout) except concurrent.futures.TimeoutError as e: raise httpx.TimeoutException('Timeout', request=None) from e -def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]: +def multi_requests(request_list: list["Request"]) -> list[httpx.Response | Exception]: """send multiple HTTP requests in parallel. Wait for all requests to finish.""" with _record_http_time() as start_time: # send the requests @@ -124,74 +132,74 @@ def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, return responses -class Request(NamedTuple): +class Request(t.NamedTuple): """Request description for the multi_requests function""" method: str url: str - kwargs: Dict[str, str] = {} + kwargs: dict[str, str] = {} @staticmethod - def get(url, **kwargs): + def get(url: str, **kwargs: t.Any): return Request('GET', url, kwargs) @staticmethod - def options(url, **kwargs): + def options(url: str, **kwargs: t.Any): return Request('OPTIONS', url, kwargs) @staticmethod - def head(url, **kwargs): + def head(url: str, **kwargs: t.Any): return Request('HEAD', url, kwargs) @staticmethod - def post(url, **kwargs): + def post(url: str, **kwargs: t.Any): return Request('POST', url, kwargs) @staticmethod - def put(url, **kwargs): + def put(url: str, **kwargs: t.Any): return Request('PUT', url, kwargs) @staticmethod - def patch(url, **kwargs): + def patch(url: str, **kwargs: t.Any): return Request('PATCH', url, kwargs) @staticmethod - def delete(url, **kwargs): + def delete(url: str, **kwargs: t.Any): return Request('DELETE', url, kwargs) -def get(url, **kwargs) -> SXNG_Response: +def get(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', True) return request('get', url, **kwargs) -def options(url, **kwargs) -> SXNG_Response: +def options(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', True) return request('options', url, **kwargs) -def head(url, **kwargs) -> SXNG_Response: +def head(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', False) return request('head', url, **kwargs) -def post(url, data=None, **kwargs) -> SXNG_Response: +def post(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('post', url, data=data, **kwargs) -def put(url, data=None, **kwargs) -> SXNG_Response: +def put(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('put', url, data=data, **kwargs) -def patch(url, data=None, **kwargs) -> SXNG_Response: +def patch(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('patch', url, data=data, **kwargs) -def delete(url, **kwargs) -> SXNG_Response: +def delete(url: str, **kwargs: t.Any) -> SXNG_Response: return request('delete', url, **kwargs) -async def stream_chunk_to_queue(network, queue, method, url, **kwargs): +async def stream_chunk_to_queue(network, queue, method: str, url: str, **kwargs: t.Any): try: async with await network.stream(method, url, **kwargs) as response: queue.put(response) @@ -217,7 +225,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): queue.put(None) -def _stream_generator(method, url, **kwargs): +def _stream_generator(method: str, url: str, **kwargs: t.Any): queue = SimpleQueue() network = get_context_network() future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop()) @@ -242,7 +250,7 @@ def _close_response_method(self): continue -def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]: +def stream(method: str, url: str, **kwargs: t.Any) -> tuple[httpx.Response, Iterable[bytes]]: """Replace httpx.stream. Usage: |