diff options
Diffstat (limited to 'searx/network')
| -rw-r--r-- | searx/network/__init__.py | 8 | ||||
| -rw-r--r-- | searx/network/client.py | 121 | ||||
| -rw-r--r-- | searx/network/network.py | 45 |
3 files changed, 90 insertions, 84 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 21c4c27b5..260d4f105 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -43,24 +43,20 @@ THREADLOCAL = threading.local() """Thread-local data is data for thread specific values.""" def reset_time_for_thread(): - global THREADLOCAL THREADLOCAL.total_time = 0 def get_time_for_thread(): """returns thread's total time or None""" - global THREADLOCAL return THREADLOCAL.__dict__.get('total_time') def set_timeout_for_thread(timeout, start_time=None): - global THREADLOCAL THREADLOCAL.timeout = timeout THREADLOCAL.start_time = start_time def set_context_network_name(network_name): - global THREADLOCAL THREADLOCAL.network = get_network(network_name) @@ -69,13 +65,11 @@ def get_context_network(): If unset, return value from :py:obj:`get_network`. """ - global THREADLOCAL return THREADLOCAL.__dict__.get('network') or get_network() def request(method, url, **kwargs): """same as requests/requests/api.py request(...)""" - global THREADLOCAL time_before_request = default_timer() # timeout (httpx) @@ -172,7 +166,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): async for chunk in response.aiter_raw(65536): if len(chunk) > 0: queue.put(chunk) - except httpx.ResponseClosed: + except httpx.StreamClosed: # the response was queued before the exception. # the exception was raised on aiter_raw. # we do nothing here: in the finally block, None will be queued diff --git a/searx/network/client.py b/searx/network/client.py index 187ae5366..925c0fdd3 100644 --- a/searx/network/client.py +++ b/searx/network/client.py @@ -5,6 +5,7 @@ import asyncio import logging import threading + import httpcore import httpx from httpx_socks import AsyncProxyTransport @@ -26,19 +27,22 @@ else: uvloop.install() -logger = logger.getChild('searx.http.client') +logger = logger.getChild('searx.network.client') LOOP = None SSLCONTEXTS = {} TRANSPORT_KWARGS = { - 'backend': 'asyncio', + # use anyio : + # * https://github.com/encode/httpcore/issues/344 + # * https://github.com/encode/httpx/discussions/1511 + 'backend': 'anyio', 'trust_env': False, } # pylint: disable=protected-access async def close_connections_for_url( - connection_pool: httpcore.AsyncConnectionPool, - url: httpcore._utils.URL ): + connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL +): origin = httpcore._utils.url_to_origin(url) logger.debug('Drop connections for %r', origin) @@ -47,97 +51,93 @@ async def close_connections_for_url( await connection_pool._remove_from_pool(connection) try: await connection.aclose() - except httpcore.NetworkError as e: + except httpx.NetworkError as e: logger.warning('Error closing an existing connection', exc_info=e) # pylint: enable=protected-access def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False): - global SSLCONTEXTS key = (proxy_url, cert, verify, trust_env, http2) if key not in SSLCONTEXTS: SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2) return SSLCONTEXTS[key] -class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport): +class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport): """Block HTTP request""" - async def arequest(self, method, url, headers=None, stream=None, ext=None): - raise httpcore.UnsupportedProtocol("HTTP protocol is disabled") + async def handle_async_request( + self, method, url, headers=None, stream=None, extensions=None + ): + raise httpx.UnsupportedProtocol('HTTP protocol is disabled') class AsyncProxyTransportFixed(AsyncProxyTransport): """Fix httpx_socks.AsyncProxyTransport - Map python_socks exceptions to httpcore.ProxyError - - Map socket.gaierror to httpcore.ConnectError + Map python_socks exceptions to httpx.ProxyError / httpx.ConnectError - Note: keepalive_expiry is ignored, AsyncProxyTransport should call: - * self._keepalive_sweep() - * self._response_closed(self, connection) + Map socket.gaierror to httpx.ConnectError Note: AsyncProxyTransport inherit from AsyncConnectionPool - - Note: the API is going to change on httpx 0.18.0 - see https://github.com/encode/httpx/pull/1522 """ - async def arequest(self, method, url, headers=None, stream=None, ext=None): + async def handle_async_request( + self, method, url, headers=None, stream=None, extensions=None + ): retry = 2 while retry > 0: retry -= 1 try: - return await super().arequest(method, url, headers, stream, ext) + return await super().handle_async_request( + method, url, headers=headers, stream=stream, extensions=extensions + ) except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e: - raise httpcore.ProxyError(e) + raise httpx.ProxyError from e except OSError as e: # socket.gaierror when DNS resolution fails - raise httpcore.NetworkError(e) - except httpcore.RemoteProtocolError as e: - # in case of httpcore.RemoteProtocolError: Server disconnected - await close_connections_for_url(self, url) - logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e) - # retry - except (httpcore.NetworkError, httpcore.ProtocolError) as e: - # httpcore.WriteError on HTTP/2 connection leaves a new opened stream + raise httpx.ConnectError from e + except httpx.NetworkError as e: + # httpx.WriteError on HTTP/2 connection leaves a new opened stream # then each new request creates a new stream and raise the same WriteError await close_connections_for_url(self, url) raise e + except httpx.RemoteProtocolError as e: + # in case of httpx.RemoteProtocolError: Server disconnected + await close_connections_for_url(self, url) + logger.warning('httpx.RemoteProtocolError: retry', exc_info=e) + # retry class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport): """Fix httpx.AsyncHTTPTransport""" - async def arequest(self, method, url, headers=None, stream=None, ext=None): + async def handle_async_request( + self, method, url, headers=None, stream=None, extensions=None + ): retry = 2 while retry > 0: retry -= 1 try: - return await super().arequest(method, url, headers, stream, ext) + return await super().handle_async_request( + method, url, headers=headers, stream=stream, extensions=extensions + ) except OSError as e: # socket.gaierror when DNS resolution fails - raise httpcore.ConnectError(e) - except httpcore.CloseError as e: - # httpcore.CloseError: [Errno 104] Connection reset by peer - # raised by _keepalive_sweep() - # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # pylint: disable=line-too-long + raise httpx.ConnectError from e + except httpx.NetworkError as e: + # httpx.WriteError on HTTP/2 connection leaves a new opened stream + # then each new request creates a new stream and raise the same WriteError await close_connections_for_url(self._pool, url) - logger.warning('httpcore.CloseError: retry', exc_info=e) - # retry - except httpcore.RemoteProtocolError as e: - # in case of httpcore.RemoteProtocolError: Server disconnected + raise e + except httpx.RemoteProtocolError as e: + # in case of httpx.RemoteProtocolError: Server disconnected await close_connections_for_url(self._pool, url) - logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e) + logger.warning('httpx.RemoteProtocolError: retry', exc_info=e) # retry - except (httpcore.ProtocolError, httpcore.NetworkError) as e: - await close_connections_for_url(self._pool, url) - raise e def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): - global TRANSPORT_KWARGS # support socks5h (requests compatibility): # https://requests.readthedocs.io/en/master/user/advanced/#socks # socks5:// hostname is resolved on client side @@ -167,7 +167,6 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit def get_transport(verify, http2, local_address, proxy_url, limit, retries): - global TRANSPORT_KWARGS verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify return AsyncHTTPTransportFixed( # pylint: disable=protected-access @@ -181,20 +180,11 @@ def get_transport(verify, http2, local_address, proxy_url, limit, retries): ) -def iter_proxies(proxies): - # https://www.python-httpx.org/compatibility/#proxy-keys - if isinstance(proxies, str): - yield 'all://', proxies - elif isinstance(proxies, dict): - for pattern, proxy_url in proxies.items(): - yield pattern, proxy_url - - def new_client( # pylint: disable=too-many-arguments enable_http, verify, enable_http2, max_connections, max_keepalive_connections, keepalive_expiry, - proxies, local_address, retries, max_redirects ): + proxies, local_address, retries, max_redirects, hook_log_response ): limit = httpx.Limits( max_connections=max_connections, max_keepalive_connections=max_keepalive_connections, @@ -202,8 +192,8 @@ def new_client( ) # See https://www.python-httpx.org/advanced/#routing mounts = {} - for pattern, proxy_url in iter_proxies(proxies): - if not enable_http and (pattern == 'http' or pattern.startswith('http://')): + for pattern, proxy_url in proxies.items(): + if not enable_http and pattern.startswith('http://'): continue if (proxy_url.startswith('socks4://') or proxy_url.startswith('socks5://') @@ -221,17 +211,26 @@ def new_client( mounts['http://'] = AsyncHTTPTransportNoHttp() transport = get_transport(verify, enable_http2, local_address, None, limit, retries) - return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects) + + event_hooks = None + if hook_log_response: + event_hooks = {'response': [ hook_log_response ]} + + return httpx.AsyncClient( + transport=transport, + mounts=mounts, + max_redirects=max_redirects, + event_hooks=event_hooks, + ) def get_loop(): - global LOOP return LOOP def init(): # log - for logger_name in ('hpack.hpack', 'hpack.table'): + for logger_name in ('hpack.hpack', 'hpack.table', 'httpx._client'): logging.getLogger(logger_name).setLevel(logging.WARNING) # loop diff --git a/searx/network/network.py b/searx/network/network.py index d09a2ee0e..d58070f18 100644 --- a/searx/network/network.py +++ b/searx/network/network.py @@ -10,9 +10,11 @@ from itertools import cycle import httpx +from searx import logger, searx_debug from .client import new_client, get_loop +logger = logger.getChild('network') DEFAULT_NAME = '__DEFAULT__' NETWORKS = {} # requests compatibility when reading proxy settings from settings.yml @@ -41,7 +43,7 @@ class Network: 'enable_http', 'verify', 'enable_http2', 'max_connections', 'max_keepalive_connections', 'keepalive_expiry', 'local_addresses', 'proxies', 'max_redirects', 'retries', 'retry_on_http_error', - '_local_addresses_cycle', '_proxies_cycle', '_clients' + '_local_addresses_cycle', '_proxies_cycle', '_clients', '_logger' ) def __init__( @@ -57,7 +59,8 @@ class Network: local_addresses=None, retries=0, retry_on_http_error=None, - max_redirects=30 ): + max_redirects=30, + logger_name=None): self.enable_http = enable_http self.verify = verify @@ -73,6 +76,7 @@ class Network: self._local_addresses_cycle = self.get_ipaddress_cycle() self._proxies_cycle = self.get_proxy_cycles() self._clients = {} + self._logger = logger.getChild(logger_name) if logger_name else logger self.check_parameters() def check_parameters(self): @@ -130,12 +134,23 @@ class Network: # pylint: disable=stop-iteration-return yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items()) + async def log_response(self, response: httpx.Response): + request = response.request + status = f"{response.status_code} {response.reason_phrase}" + response_line = f"{response.http_version} {status}" + content_type = response.headers.get("Content-Type") + content_type = f' ({content_type})' if content_type else '' + self._logger.debug( + f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}' + ) + def get_client(self, verify=None, max_redirects=None): verify = self.verify if verify is None else verify max_redirects = self.max_redirects if max_redirects is None else max_redirects local_address = next(self._local_addresses_cycle) proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key key = (verify, max_redirects, local_address, proxies) + hook_log_response = self.log_response if searx_debug else None if key not in self._clients or self._clients[key].is_closed: self._clients[key] = new_client( self.enable_http, @@ -147,7 +162,8 @@ class Network: dict(proxies), local_address, 0, - max_redirects + max_redirects, + hook_log_response ) return self._clients[key] @@ -207,12 +223,10 @@ class Network: @classmethod async def aclose_all(cls): - global NETWORKS await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False) def get_network(name=None): - global NETWORKS return NETWORKS.get(name or DEFAULT_NAME) @@ -222,8 +236,6 @@ def initialize(settings_engines=None, settings_outgoing=None): from searx import settings # pylint: enable=import-outside-toplevel) - global NETWORKS - settings_engines = settings_engines or settings['engines'] settings_outgoing = settings_outgoing or settings['outgoing'] @@ -243,11 +255,13 @@ def initialize(settings_engines=None, settings_outgoing=None): 'retry_on_http_error': None, } - def new_network(params): + def new_network(params, logger_name=None): nonlocal default_params result = {} result.update(default_params) result.update(params) + if logger_name: + result['logger_name'] = logger_name return Network(**result) def iter_networks(): @@ -263,13 +277,13 @@ def initialize(settings_engines=None, settings_outgoing=None): if NETWORKS: done() NETWORKS.clear() - NETWORKS[DEFAULT_NAME] = new_network({}) - NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}) - NETWORKS['ipv6'] = new_network({'local_addresses': '::'}) + NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default') + NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4') + NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6') # define networks from outgoing.networks for network_name, network in settings_outgoing['networks'].items(): - NETWORKS[network_name] = new_network(network) + NETWORKS[network_name] = new_network(network, logger_name=network_name) # define networks from engines.[i].network (except references) for engine_name, engine, network in iter_networks(): @@ -280,9 +294,9 @@ def initialize(settings_engines=None, settings_outgoing=None): network[attribute_name] = getattr(engine, attribute_name) else: network[attribute_name] = attribute_value - NETWORKS[engine_name] = new_network(network) + NETWORKS[engine_name] = new_network(network, logger_name=engine_name) elif isinstance(network, dict): - NETWORKS[engine_name] = new_network(network) + NETWORKS[engine_name] = new_network(network, logger_name=engine_name) # define networks from engines.[i].network (references) for engine_name, engine, network in iter_networks(): @@ -295,7 +309,7 @@ def initialize(settings_engines=None, settings_outgoing=None): if 'image_proxy' not in NETWORKS: image_proxy_params = default_params.copy() image_proxy_params['enable_http2'] = False - NETWORKS['image_proxy'] = new_network(image_proxy_params) + NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy') @atexit.register @@ -308,7 +322,6 @@ def done(): Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__ So Network.aclose is called here using atexit.register """ - global NETWORKS try: loop = get_loop() if loop: |