From 29893cf816ab7dccaa68697d5600326b82606972 Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Tue, 28 Sep 2021 15:26:34 +0200 Subject: [fix] searx.network.stream: fix memory leak --- searx/network/__init__.py | 47 +++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) (limited to 'searx/network/__init__.py') diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 260d4f105..37df0c85a 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -9,6 +9,7 @@ from types import MethodType from timeit import default_timer import httpx +import anyio import h2.exceptions from .network import get_network, initialize @@ -166,7 +167,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): async for chunk in response.aiter_raw(65536): if len(chunk) > 0: queue.put(chunk) - except httpx.StreamClosed: + except (httpx.StreamClosed, anyio.ClosedResourceError): # the response was queued before the exception. # the exception was raised on aiter_raw. # we do nothing here: in the finally block, None will be queued @@ -183,11 +184,35 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs): queue.put(None) +def _stream_generator(method, url, **kwargs): + queue = SimpleQueue() + network = get_context_network() + future = asyncio.run_coroutine_threadsafe( + stream_chunk_to_queue(network, queue, method, url, **kwargs), + get_loop() + ) + + # yield chunks + obj_or_exception = queue.get() + while obj_or_exception is not None: + if isinstance(obj_or_exception, Exception): + raise obj_or_exception + yield obj_or_exception + obj_or_exception = queue.get() + future.result() + + def _close_response_method(self): asyncio.run_coroutine_threadsafe( self.aclose(), get_loop() ) + # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak. + # it makes sure that : + # * the httpx response is closed (see the stream_chunk_to_queue function) + # * to call future.result() in _stream_generator + for _ in self._generator: # pylint: disable=protected-access + continue def stream(method, url, **kwargs): @@ -202,25 +227,15 @@ def stream(method, url, **kwargs): httpx.Client.stream requires to write the httpx.HTTPTransport version of the the httpx.AsyncHTTPTransport declared above. """ - queue = SimpleQueue() - network = get_context_network() - future = asyncio.run_coroutine_threadsafe( - stream_chunk_to_queue(network, queue, method, url, **kwargs), - get_loop() - ) + generator = _stream_generator(method, url, **kwargs) # yield response - response = queue.get() + response = next(generator) # pylint: disable=stop-iteration-return if isinstance(response, Exception): raise response + + response._generator = generator # pylint: disable=protected-access response.close = MethodType(_close_response_method, response) yield response - # yield chunks - chunk_or_exception = queue.get() - while chunk_or_exception is not None: - if isinstance(chunk_or_exception, Exception): - raise chunk_or_exception - yield chunk_or_exception - chunk_or_exception = queue.get() - future.result() + yield from generator -- cgit v1.2.3