summaryrefslogtreecommitdiff
path: root/searx/network/__init__.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2021-09-28 15:26:34 +0200
committerAlexandre Flament <alex@al-f.net>2021-09-28 19:28:12 +0200
commit29893cf816ab7dccaa68697d5600326b82606972 (patch)
treef99c4d26740560e9e20ac693e6a9588afc720ab4 /searx/network/__init__.py
parent2eab89b4ca12a404390690210f885664fa26c173 (diff)
[fix] searx.network.stream: fix memory leak
Diffstat (limited to 'searx/network/__init__.py')
-rw-r--r--searx/network/__init__.py47
1 files changed, 31 insertions, 16 deletions
diff --git a/searx/network/__init__.py b/searx/network/__init__.py
index 260d4f105..37df0c85a 100644
--- a/searx/network/__init__.py
+++ b/searx/network/__init__.py
@@ -9,6 +9,7 @@ from types import MethodType
from timeit import default_timer
import httpx
+import anyio
import h2.exceptions
from .network import get_network, initialize
@@ -166,7 +167,7 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
async for chunk in response.aiter_raw(65536):
if len(chunk) > 0:
queue.put(chunk)
- except httpx.StreamClosed:
+ except (httpx.StreamClosed, anyio.ClosedResourceError):
# the response was queued before the exception.
# the exception was raised on aiter_raw.
# we do nothing here: in the finally block, None will be queued
@@ -183,11 +184,35 @@ async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
queue.put(None)
+def _stream_generator(method, url, **kwargs):
+ queue = SimpleQueue()
+ network = get_context_network()
+ future = asyncio.run_coroutine_threadsafe(
+ stream_chunk_to_queue(network, queue, method, url, **kwargs),
+ get_loop()
+ )
+
+ # yield chunks
+ obj_or_exception = queue.get()
+ while obj_or_exception is not None:
+ if isinstance(obj_or_exception, Exception):
+ raise obj_or_exception
+ yield obj_or_exception
+ obj_or_exception = queue.get()
+ future.result()
+
+
def _close_response_method(self):
asyncio.run_coroutine_threadsafe(
self.aclose(),
get_loop()
)
+ # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
+ # it makes sure that :
+ # * the httpx response is closed (see the stream_chunk_to_queue function)
+ # * to call future.result() in _stream_generator
+ for _ in self._generator: # pylint: disable=protected-access
+ continue
def stream(method, url, **kwargs):
@@ -202,25 +227,15 @@ def stream(method, url, **kwargs):
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
- queue = SimpleQueue()
- network = get_context_network()
- future = asyncio.run_coroutine_threadsafe(
- stream_chunk_to_queue(network, queue, method, url, **kwargs),
- get_loop()
- )
+ generator = _stream_generator(method, url, **kwargs)
# yield response
- response = queue.get()
+ response = next(generator) # pylint: disable=stop-iteration-return
if isinstance(response, Exception):
raise response
+
+ response._generator = generator # pylint: disable=protected-access
response.close = MethodType(_close_response_method, response)
yield response
- # yield chunks
- chunk_or_exception = queue.get()
- while chunk_or_exception is not None:
- if isinstance(chunk_or_exception, Exception):
- raise chunk_or_exception
- yield chunk_or_exception
- chunk_or_exception = queue.get()
- future.result()
+ yield from generator