summaryrefslogtreecommitdiff
path: root/searx/network/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r--searx/network/__init__.py58
1 files changed, 33 insertions, 25 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py
index 6230b9e39..070388d2e 100644
--- a/searx/network/__init__.py
+++ b/searx/network/__init__.py
@@ -1,13 +1,17 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring, global-statement
+__all__ = ["initialize", "check_network_configuration", "raise_for_httperror"]
+
+import typing as t
+
import asyncio
import threading
import concurrent.futures
from queue import SimpleQueue
from types import MethodType
from timeit import default_timer
-from typing import Iterable, NamedTuple, Tuple, List, Dict, Union
+from collections.abc import Iterable
from contextlib import contextmanager
import httpx
@@ -32,12 +36,12 @@ def get_time_for_thread():
return THREADLOCAL.__dict__.get('total_time')
-def set_timeout_for_thread(timeout, start_time=None):
+def set_timeout_for_thread(timeout: float, start_time: float | None = None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
-def set_context_network_name(network_name):
+def set_context_network_name(network_name: str):
THREADLOCAL.network = get_network(network_name)
@@ -64,9 +68,10 @@ def _record_http_time():
THREADLOCAL.total_time += time_after_request - time_before_request
-def _get_timeout(start_time, kwargs):
+def _get_timeout(start_time: float, kwargs):
# pylint: disable=too-many-branches
+ timeout: float | None
# timeout (httpx)
if 'timeout' in kwargs:
timeout = kwargs['timeout']
@@ -91,14 +96,17 @@ def request(method, url, **kwargs) -> SXNG_Response:
with _record_http_time() as start_time:
network = get_context_network()
timeout = _get_timeout(start_time, kwargs)
- future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
+ future = asyncio.run_coroutine_threadsafe(
+ network.request(method, url, **kwargs),
+ get_loop(),
+ )
try:
return future.result(timeout)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
-def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]:
+def multi_requests(request_list: list["Request"]) -> list[httpx.Response | Exception]:
"""send multiple HTTP requests in parallel. Wait for all requests to finish."""
with _record_http_time() as start_time:
# send the requests
@@ -124,74 +132,74 @@ def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response,
return responses
-class Request(NamedTuple):
+class Request(t.NamedTuple):
"""Request description for the multi_requests function"""
method: str
url: str
- kwargs: Dict[str, str] = {}
+ kwargs: dict[str, str] = {}
@staticmethod
- def get(url, **kwargs):
+ def get(url: str, **kwargs: t.Any):
return Request('GET', url, kwargs)
@staticmethod
- def options(url, **kwargs):
+ def options(url: str, **kwargs: t.Any):
return Request('OPTIONS', url, kwargs)
@staticmethod
- def head(url, **kwargs):
+ def head(url: str, **kwargs: t.Any):
return Request('HEAD', url, kwargs)
@staticmethod
- def post(url, **kwargs):
+ def post(url: str, **kwargs: t.Any):
return Request('POST', url, kwargs)
@staticmethod
- def put(url, **kwargs):
+ def put(url: str, **kwargs: t.Any):
return Request('PUT', url, kwargs)
@staticmethod
- def patch(url, **kwargs):
+ def patch(url: str, **kwargs: t.Any):
return Request('PATCH', url, kwargs)
@staticmethod
- def delete(url, **kwargs):
+ def delete(url: str, **kwargs: t.Any):
return Request('DELETE', url, kwargs)
-def get(url, **kwargs) -> SXNG_Response:
+def get(url: str, **kwargs: t.Any) -> SXNG_Response:
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
-def options(url, **kwargs) -> SXNG_Response:
+def options(url: str, **kwargs: t.Any) -> SXNG_Response:
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
-def head(url, **kwargs) -> SXNG_Response:
+def head(url: str, **kwargs: t.Any) -> SXNG_Response:
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
-def post(url, data=None, **kwargs) -> SXNG_Response:
+def post(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
return request('post', url, data=data, **kwargs)
-def put(url, data=None, **kwargs) -> SXNG_Response:
+def put(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
return request('put', url, data=data, **kwargs)
-def patch(url, data=None, **kwargs) -> SXNG_Response:
+def patch(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
return request('patch', url, data=data, **kwargs)
-def delete(url, **kwargs) -> SXNG_Response:
+def delete(url: str, **kwargs: t.Any) -> SXNG_Response:
return request('delete', url, **kwargs)
-async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
+async def stream_chunk_to_queue(network, queue, method: str, url: str, **kwargs: t.Any):
try:
async with await network.stream(method, url, **kwargs) as response:
queue.put(response)
@@ -217,7 +225,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
queue.put(None)
-def _stream_generator(method, url, **kwargs):
+def _stream_generator(method: str, url: str, **kwargs: t.Any):
queue = SimpleQueue()
network = get_context_network()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
@@ -242,7 +250,7 @@ def _close_response_method(self):
continue
-def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
+def stream(method: str, url: str, **kwargs: t.Any) -> tuple[httpx.Response, Iterable[bytes]]:
"""Replace httpx.stream.
Usage: