From 43fcaa642a63d75096b33d44ce7f7c0de1bce614 Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Sat, 14 Aug 2021 19:36:30 +0200 Subject: [fix] image_proxy: always close the httpx respone previously, when the content type was not an image and some other error, the httpx response was not closed --- searx/network/__init__.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) (limited to 'searx/network/__init__.py') diff --git a/searx/network/__init__.py b/searx/network/__init__.py index aaebb7928..9e80a30a1 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -5,6 +5,7 @@ import asyncio import threading import concurrent.futures +from types import MethodType from timeit import default_timer import httpx @@ -161,6 +162,7 @@ def patch(url, data=None, **kwargs): def delete(url, **kwargs): return request('delete', url, **kwargs) + async def stream_chunk_to_queue(network, queue, method, url, **kwargs): try: async with network.stream(method, url, **kwargs) as response: @@ -170,12 +172,22 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): async for chunk in response.aiter_raw(65536): if len(chunk) > 0: queue.put(chunk) + except httpx.ResponseClosed as e: + # the response was closed + pass except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e: queue.put(e) finally: queue.put(None) +def _close_response_method(self): + asyncio.run_coroutine_threadsafe( + self.aclose(), + get_loop() + ) + + def stream(method, url, **kwargs): """Replace httpx.stream. @@ -193,10 +205,19 @@ def stream(method, url, **kwargs): stream_chunk_to_queue(get_network(), queue, method, url, **kwargs), get_loop() ) + + # yield response + response = queue.get() + if isinstance(response, Exception): + raise response + response.close = MethodType(_close_response_method, response) + yield response + + # yield chunks chunk_or_exception = queue.get() while chunk_or_exception is not None: if isinstance(chunk_or_exception, Exception): raise chunk_or_exception yield chunk_or_exception chunk_or_exception = queue.get() - return future.result() + future.result() -- cgit v1.2.3