summaryrefslogtreecommitdiff
path: root/searx/poolrequests.py
diff options
context:
space:
mode:
Diffstat (limited to 'searx/poolrequests.py')
-rw-r--r--searx/poolrequests.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/searx/poolrequests.py b/searx/poolrequests.py
new file mode 100644
index 000000000..f9a9d7719
--- /dev/null
+++ b/searx/poolrequests.py
@@ -0,0 +1,159 @@
+import requests
+
+from itertools import cycle
+from threading import RLock, local
+from searx import settings
+from time import time
+
+
+class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
+
+ def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
+ pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
+ max_retries=requests.adapters.DEFAULT_RETRIES,
+ pool_block=requests.adapters.DEFAULT_POOLBLOCK,
+ **conn_params):
+ if max_retries == requests.adapters.DEFAULT_RETRIES:
+ self.max_retries = requests.adapters.Retry(0, read=False)
+ else:
+ self.max_retries = requests.adapters.Retry.from_int(max_retries)
+ self.config = {}
+ self.proxy_manager = {}
+
+ super(requests.adapters.HTTPAdapter, self).__init__()
+
+ self._pool_connections = pool_connections
+ self._pool_maxsize = pool_maxsize
+ self._pool_block = pool_block
+ self._conn_params = conn_params
+
+ self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
+
+ def __setstate__(self, state):
+ # Can't handle by adding 'proxy_manager' to self.__attrs__ because
+ # because self.poolmanager uses a lambda function, which isn't pickleable.
+ self.proxy_manager = {}
+ self.config = {}
+
+ for attr, value in state.items():
+ setattr(self, attr, value)
+
+ self.init_poolmanager(self._pool_connections, self._pool_maxsize,
+ block=self._pool_block, **self._conn_params)
+
+
+threadLocal = local()
+connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
+maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
+if settings['outgoing'].get('source_ips'):
+ http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
+ source_address=(source_ip, 0))
+ for source_ip in settings['outgoing']['source_ips'])
+ https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
+ source_address=(source_ip, 0))
+ for source_ip in settings['outgoing']['source_ips'])
+else:
+ http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
+ https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
+
+
+class SessionSinglePool(requests.Session):
+
+ def __init__(self):
+ super(SessionSinglePool, self).__init__()
+
+ # reuse the same adapters
+ with RLock():
+ self.adapters.clear()
+ self.mount('https://', next(https_adapters))
+ self.mount('http://', next(http_adapters))
+
+ def close(self):
+ """Call super, but clear adapters since there are managed globaly"""
+ self.adapters.clear()
+ super(SessionSinglePool, self).close()
+
+
+def set_timeout_for_thread(timeout, start_time=None):
+ threadLocal.timeout = timeout
+ threadLocal.start_time = start_time
+
+
+def reset_time_for_thread():
+ threadLocal.total_time = 0
+
+
+def get_time_for_thread():
+ return threadLocal.total_time
+
+
+def request(method, url, **kwargs):
+ """same as requests/requests/api.py request(...)"""
+ time_before_request = time()
+
+ # session start
+ session = SessionSinglePool()
+
+ # proxies
+ kwargs['proxies'] = settings['outgoing'].get('proxies') or None
+
+ # timeout
+ if 'timeout' in kwargs:
+ timeout = kwargs['timeout']
+ else:
+ timeout = getattr(threadLocal, 'timeout', None)
+ if timeout is not None:
+ kwargs['timeout'] = timeout
+
+ # do request
+ response = session.request(method=method, url=url, **kwargs)
+
+ time_after_request = time()
+
+ # is there a timeout for this engine ?
+ if timeout is not None:
+ timeout_overhead = 0.2 # seconds
+ # start_time = when the user request started
+ start_time = getattr(threadLocal, 'start_time', time_before_request)
+ search_duration = time_after_request - start_time
+ if search_duration > timeout + timeout_overhead:
+ raise requests.exceptions.Timeout(response=response)
+
+ # session end
+ session.close()
+
+ if hasattr(threadLocal, 'total_time'):
+ threadLocal.total_time += time_after_request - time_before_request
+
+ return response
+
+
+def get(url, **kwargs):
+ kwargs.setdefault('allow_redirects', True)
+ return request('get', url, **kwargs)
+
+
+def options(url, **kwargs):
+ kwargs.setdefault('allow_redirects', True)
+ return request('options', url, **kwargs)
+
+
+def head(url, **kwargs):
+ kwargs.setdefault('allow_redirects', False)
+ return request('head', url, **kwargs)
+
+
+def post(url, data=None, **kwargs):
+ return request('post', url, data=data, **kwargs)
+
+
+def put(url, data=None, **kwargs):
+ return request('put', url, data=data, **kwargs)
+
+
+def patch(url, data=None, **kwargs):
+ return request('patch', url, data=data, **kwargs)
+
+
+def delete(url, **kwargs):
+ return request('delete', url, **kwargs)