diff options
Diffstat (limited to 'searx/network/__init__.py')
| -rw-r--r-- | searx/network/__init__.py | 58 |
1 files changed, 33 insertions, 25 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 6230b9e39..070388d2e 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -1,13 +1,17 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring, global-statement +__all__ = ["initialize", "check_network_configuration", "raise_for_httperror"] + +import typing as t + import asyncio import threading import concurrent.futures from queue import SimpleQueue from types import MethodType from timeit import default_timer -from typing import Iterable, NamedTuple, Tuple, List, Dict, Union +from collections.abc import Iterable from contextlib import contextmanager import httpx @@ -32,12 +36,12 @@ def get_time_for_thread(): return THREADLOCAL.__dict__.get('total_time') -def set_timeout_for_thread(timeout, start_time=None): +def set_timeout_for_thread(timeout: float, start_time: float | None = None): THREADLOCAL.timeout = timeout THREADLOCAL.start_time = start_time -def set_context_network_name(network_name): +def set_context_network_name(network_name: str): THREADLOCAL.network = get_network(network_name) @@ -64,9 +68,10 @@ def _record_http_time(): THREADLOCAL.total_time += time_after_request - time_before_request -def _get_timeout(start_time, kwargs): +def _get_timeout(start_time: float, kwargs): # pylint: disable=too-many-branches + timeout: float | None # timeout (httpx) if 'timeout' in kwargs: timeout = kwargs['timeout'] @@ -91,14 +96,17 @@ def request(method, url, **kwargs) -> SXNG_Response: with _record_http_time() as start_time: network = get_context_network() timeout = _get_timeout(start_time, kwargs) - future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop()) + future = asyncio.run_coroutine_threadsafe( + network.request(method, url, **kwargs), + get_loop(), + ) try: return future.result(timeout) except concurrent.futures.TimeoutError as e: raise httpx.TimeoutException('Timeout', request=None) from e -def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]: +def multi_requests(request_list: list["Request"]) -> list[httpx.Response | Exception]: """send multiple HTTP requests in parallel. Wait for all requests to finish.""" with _record_http_time() as start_time: # send the requests @@ -124,74 +132,74 @@ def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, return responses -class Request(NamedTuple): +class Request(t.NamedTuple): """Request description for the multi_requests function""" method: str url: str - kwargs: Dict[str, str] = {} + kwargs: dict[str, str] = {} @staticmethod - def get(url, **kwargs): + def get(url: str, **kwargs: t.Any): return Request('GET', url, kwargs) @staticmethod - def options(url, **kwargs): + def options(url: str, **kwargs: t.Any): return Request('OPTIONS', url, kwargs) @staticmethod - def head(url, **kwargs): + def head(url: str, **kwargs: t.Any): return Request('HEAD', url, kwargs) @staticmethod - def post(url, **kwargs): + def post(url: str, **kwargs: t.Any): return Request('POST', url, kwargs) @staticmethod - def put(url, **kwargs): + def put(url: str, **kwargs: t.Any): return Request('PUT', url, kwargs) @staticmethod - def patch(url, **kwargs): + def patch(url: str, **kwargs: t.Any): return Request('PATCH', url, kwargs) @staticmethod - def delete(url, **kwargs): + def delete(url: str, **kwargs: t.Any): return Request('DELETE', url, kwargs) -def get(url, **kwargs) -> SXNG_Response: +def get(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', True) return request('get', url, **kwargs) -def options(url, **kwargs) -> SXNG_Response: +def options(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', True) return request('options', url, **kwargs) -def head(url, **kwargs) -> SXNG_Response: +def head(url: str, **kwargs: t.Any) -> SXNG_Response: kwargs.setdefault('allow_redirects', False) return request('head', url, **kwargs) -def post(url, data=None, **kwargs) -> SXNG_Response: +def post(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('post', url, data=data, **kwargs) -def put(url, data=None, **kwargs) -> SXNG_Response: +def put(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('put', url, data=data, **kwargs) -def patch(url, data=None, **kwargs) -> SXNG_Response: +def patch(url: str, data=None, **kwargs: t.Any) -> SXNG_Response: return request('patch', url, data=data, **kwargs) -def delete(url, **kwargs) -> SXNG_Response: +def delete(url: str, **kwargs: t.Any) -> SXNG_Response: return request('delete', url, **kwargs) -async def stream_chunk_to_queue(network, queue, method, url, **kwargs): +async def stream_chunk_to_queue(network, queue, method: str, url: str, **kwargs: t.Any): try: async with await network.stream(method, url, **kwargs) as response: queue.put(response) @@ -217,7 +225,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): queue.put(None) -def _stream_generator(method, url, **kwargs): +def _stream_generator(method: str, url: str, **kwargs: t.Any): queue = SimpleQueue() network = get_context_network() future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop()) @@ -242,7 +250,7 @@ def _close_response_method(self): continue -def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]: +def stream(method: str, url: str, **kwargs: t.Any) -> tuple[httpx.Response, Iterable[bytes]]: """Replace httpx.stream. Usage: |