summaryrefslogtreecommitdiff
path: root/searx/search/processors/abstract.py
diff options
context:
space:
mode:
authorMarkus Heiser <markus.heiser@darmarit.de>2025-09-11 19:10:27 +0200
committerMarkus Heiser <markus.heiser@darmarIT.de>2025-09-18 19:40:03 +0200
commit8f8343dc0d78bb57215afc3e99fd9000fce6e0cf (patch)
tree7c0aa8587ed4bc47e403b4148a308191e2d21c55 /searx/search/processors/abstract.py
parent23257bddce864cfc44d64324dee36b32b1cf5248 (diff)
[mod] addition of various type hints / engine processors
Continuation of #5147 .. typification of the engine processors. BTW: - removed obsolete engine property https_support - fixed & improved currency_convert - engine instances can now implement a engine.setup method [#5147] https://github.com/searxng/searxng/pull/5147 Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
Diffstat (limited to 'searx/search/processors/abstract.py')
-rw-r--r--searx/search/processors/abstract.py252
1 files changed, 175 insertions, 77 deletions
diff --git a/searx/search/processors/abstract.py b/searx/search/processors/abstract.py
index 2dd56855a..ec94ed3bf 100644
--- a/searx/search/processors/abstract.py
+++ b/searx/search/processors/abstract.py
@@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
-"""Abstract base classes for engine request processors.
-
-"""
+"""Abstract base classes for all engine processors."""
import typing as t
@@ -10,25 +8,75 @@ import threading
from abc import abstractmethod, ABC
from timeit import default_timer
-from searx import settings, logger
+from searx import get_setting
+from searx import logger
from searx.engines import engines
from searx.network import get_time_for_thread, get_network
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
-from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
+from searx.exceptions import SearxEngineAccessDeniedException
from searx.utils import get_engine_from_settings
if t.TYPE_CHECKING:
+ import types
from searx.enginelib import Engine
+ from searx.search.models import SearchQuery
+ from searx.results import ResultContainer
+ from searx.result_types import Result, LegacyResult # pyright: ignore[reportPrivateLocalImportUsage]
+
+
+logger = logger.getChild("searx.search.processor")
+SUSPENDED_STATUS: dict[int | str, "SuspendedStatus"] = {}
+
+
+class RequestParams(t.TypedDict):
+ """Basic quantity of the Request parameters of all engine types."""
+
+ query: str
+ """Search term, stripped of search syntax arguments."""
+
+ category: str
+ """Current category, like ``general``.
+
+ .. hint::
+
+ This field is deprecated, don't use it in further implementations.
-logger = logger.getChild('searx.search.processor')
-SUSPENDED_STATUS: dict[int | str, 'SuspendedStatus'] = {}
+ This field is currently *arbitrarily* filled with the name of "one""
+ category (the name of the first category of the engine). In practice,
+ however, it is not clear what this "one" category should be; in principle,
+ multiple categories can also be activated in a search.
+ """
+
+ pageno: int
+ """Current page number, where the first page is ``1``."""
+
+ safesearch: t.Literal[0, 1, 2]
+ """Safe-Search filter (0:normal, 1:moderate, 2:strict)."""
+
+ time_range: t.Literal["day", "week", "month", "year"] | None
+ """Time-range filter."""
+
+ engine_data: dict[str, str]
+ """Allows the transfer of (engine specific) data to the next request of the
+ client. In the case of the ``online`` engines, this data is delivered to
+ the client via the HTML ``<form>`` in response.
+
+ If the client then sends this form back to the server with the next request,
+ this data will be available.
+
+ This makes it possible to carry data from one request to the next without a
+ session context, but this feature (is fragile) and should only be used in
+ exceptional cases. See also :ref:`engine_data`."""
+
+ searxng_locale: str
+ """Language / locale filter from the search request, a string like 'all',
+ 'en', 'en-US', 'zh-HK' .. and others, for more details see
+ :py:obj:`searx.locales`."""
class SuspendedStatus:
"""Class to handle suspend state."""
- __slots__ = 'suspend_end_time', 'suspend_reason', 'continuous_errors', 'lock'
-
def __init__(self):
self.lock: threading.Lock = threading.Lock()
self.continuous_errors: int = 0
@@ -39,18 +87,18 @@ class SuspendedStatus:
def is_suspended(self):
return self.suspend_end_time >= default_timer()
- def suspend(self, suspended_time: int, suspend_reason: str):
+ def suspend(self, suspended_time: int | None, suspend_reason: str):
with self.lock:
# update continuous_errors / suspend_end_time
self.continuous_errors += 1
if suspended_time is None:
- suspended_time = min(
- settings['search']['max_ban_time_on_fail'],
- self.continuous_errors * settings['search']['ban_time_on_fail'],
- )
+ max_ban: int = get_setting("search.max_ban_time_on_fail")
+ ban_fail: int = get_setting("search.ban_time_on_fail")
+ suspended_time = min(max_ban, ban_fail)
+
self.suspend_end_time = default_timer() + suspended_time
self.suspend_reason = suspend_reason
- logger.debug('Suspend for %i seconds', suspended_time)
+ logger.debug("Suspend for %i seconds", suspended_time)
def resume(self):
with self.lock:
@@ -63,31 +111,63 @@ class SuspendedStatus:
class EngineProcessor(ABC):
"""Base classes used for all types of request processors."""
- __slots__ = 'engine', 'engine_name', 'suspended_status', 'logger'
+ engine_type: str
- def __init__(self, engine: "Engine|ModuleType", engine_name: str):
- self.engine: "Engine" = engine
- self.engine_name: str = engine_name
- self.logger: logging.Logger = engines[engine_name].logger
- key = get_network(self.engine_name)
- key = id(key) if key else self.engine_name
+ def __init__(self, engine: "Engine|types.ModuleType"):
+ self.engine: "Engine" = engine # pyright: ignore[reportAttributeAccessIssue]
+ self.logger: logging.Logger = engines[engine.name].logger
+ key = get_network(self.engine.name)
+ key = id(key) if key else self.engine.name
self.suspended_status: SuspendedStatus = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
- def initialize(self):
- try:
- self.engine.init(get_engine_from_settings(self.engine_name))
- except SearxEngineResponseException as exc:
- self.logger.warning('Fail to initialize // %s', exc)
- except Exception: # pylint: disable=broad-except
- self.logger.exception('Fail to initialize')
- else:
- self.logger.debug('Initialized')
+ def initialize(self, callback: t.Callable[["EngineProcessor", bool], bool]):
+ """Initialization of *this* :py:obj:`EngineProcessor`.
- @property
- def has_initialize_function(self):
- return hasattr(self.engine, 'init')
+ If processor's engine has an ``init`` method, it is called first.
+ Engine's ``init`` method is executed in a thread, meaning that the
+ *registration* (the ``callback``) may occur later and is not already
+ established by the return from this registration method.
+
+ Registration only takes place if the ``init`` method is not available or
+ is successfully run through.
+ """
+
+ if not hasattr(self.engine, "init"):
+ callback(self, True)
+ return
- def handle_exception(self, result_container, exception_or_message, suspend=False):
+ if not callable(self.engine.init):
+ logger.error("Engine's init method isn't a callable (is of type: %s).", type(self.engine.init))
+ callback(self, False)
+ return
+
+ def __init_processor_thread():
+ eng_ok = self.init_engine()
+ callback(self, eng_ok)
+
+ # set up and start a thread
+ threading.Thread(target=__init_processor_thread, daemon=True).start()
+
+ def init_engine(self) -> bool:
+ eng_setting = get_engine_from_settings(self.engine.name)
+ init_ok: bool | None = False
+ try:
+ init_ok = self.engine.init(eng_setting)
+ except Exception: # pylint: disable=broad-except
+ logger.exception("Init method of engine %s failed due to an exception.", self.engine.name)
+ init_ok = False
+ # In older engines, None is returned from the init method, which is
+ # equivalent to indicating that the initialization was successful.
+ if init_ok is None:
+ init_ok = True
+ return init_ok
+
+ def handle_exception(
+ self,
+ result_container: "ResultContainer",
+ exception_or_message: BaseException | str,
+ suspend: bool = False,
+ ):
# update result_container
if isinstance(exception_or_message, BaseException):
exception_class = exception_or_message.__class__
@@ -96,13 +176,13 @@ class EngineProcessor(ABC):
error_message = module_name + exception_class.__qualname__
else:
error_message = exception_or_message
- result_container.add_unresponsive_engine(self.engine_name, error_message)
+ result_container.add_unresponsive_engine(self.engine.name, error_message)
# metrics
- counter_inc('engine', self.engine_name, 'search', 'count', 'error')
+ counter_inc('engine', self.engine.name, 'search', 'count', 'error')
if isinstance(exception_or_message, BaseException):
- count_exception(self.engine_name, exception_or_message)
+ count_exception(self.engine.name, exception_or_message)
else:
- count_error(self.engine_name, exception_or_message)
+ count_error(self.engine.name, exception_or_message)
# suspend the engine ?
if suspend:
suspended_time = None
@@ -110,51 +190,63 @@ class EngineProcessor(ABC):
suspended_time = exception_or_message.suspended_time
self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member
- def _extend_container_basic(self, result_container, start_time, search_results):
+ def _extend_container_basic(
+ self,
+ result_container: "ResultContainer",
+ start_time: float,
+ search_results: "list[Result | LegacyResult]",
+ ):
# update result_container
- result_container.extend(self.engine_name, search_results)
+ result_container.extend(self.engine.name, search_results)
engine_time = default_timer() - start_time
page_load_time = get_time_for_thread()
- result_container.add_timing(self.engine_name, engine_time, page_load_time)
+ result_container.add_timing(self.engine.name, engine_time, page_load_time)
# metrics
- counter_inc('engine', self.engine_name, 'search', 'count', 'successful')
- histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total')
+ counter_inc('engine', self.engine.name, 'search', 'count', 'successful')
+ histogram_observe(engine_time, 'engine', self.engine.name, 'time', 'total')
if page_load_time is not None:
- histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http')
-
- def extend_container(self, result_container, start_time, search_results):
+ histogram_observe(page_load_time, 'engine', self.engine.name, 'time', 'http')
+
+ def extend_container(
+ self,
+ result_container: "ResultContainer",
+ start_time: float,
+ search_results: "list[Result | LegacyResult]|None",
+ ):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
- self.handle_exception(result_container, 'timeout', None)
+ self.handle_exception(result_container, 'timeout', False)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self.suspended_status.resume()
- def extend_container_if_suspended(self, result_container):
+ def extend_container_if_suspended(self, result_container: "ResultContainer") -> bool:
if self.suspended_status.is_suspended:
result_container.add_unresponsive_engine(
- self.engine_name, self.suspended_status.suspend_reason, suspended=True
+ self.engine.name, self.suspended_status.suspend_reason, suspended=True
)
return True
return False
- def get_params(self, search_query, engine_category) -> dict[str, t.Any]:
- """Returns a set of (see :ref:`request params <engine request arguments>`) or
- ``None`` if request is not supported.
+ def get_params(self, search_query: "SearchQuery", engine_category: str) -> RequestParams | None:
+ """Returns a dictionary with the :ref:`request parameters <engine
+ request arguments>` (:py:obj:`RequestParams`), if the search condition
+ is not supported by the engine, ``None`` is returned:
- Not supported conditions (``None`` is returned):
+ - *time range* filter in search conditions, but the engine does not have
+ a corresponding filter
+ - page number > 1 when engine does not support paging
+ - page number > ``max_page``
- - A page-number > 1 when engine does not support paging.
- - A time range when the engine does not support time range.
"""
# if paging is not supported, skip
if search_query.pageno > 1 and not self.engine.paging:
return None
# if max page is reached, skip
- max_page = self.engine.max_page or settings['search']['max_page']
+ max_page = self.engine.max_page or get_setting("search.max_page")
if max_page and max_page < search_query.pageno:
return None
@@ -162,39 +254,45 @@ class EngineProcessor(ABC):
if search_query.time_range and not self.engine.time_range_support:
return None
- params = {}
- params["query"] = search_query.query
- params['category'] = engine_category
- params['pageno'] = search_query.pageno
- params['safesearch'] = search_query.safesearch
- params['time_range'] = search_query.time_range
- params['engine_data'] = search_query.engine_data.get(self.engine_name, {})
- params['searxng_locale'] = search_query.lang
-
- # deprecated / vintage --> use params['searxng_locale']
+ params: RequestParams = {
+ "query": search_query.query,
+ "category": engine_category,
+ "pageno": search_query.pageno,
+ "safesearch": search_query.safesearch,
+ "time_range": search_query.time_range,
+ "engine_data": search_query.engine_data.get(self.engine.name, {}),
+ "searxng_locale": search_query.lang,
+ }
+
+ # deprecated / vintage --> use params["searxng_locale"]
#
# Conditions related to engine's traits are implemented in engine.traits
- # module. Don't do 'locale' decisions here in the abstract layer of the
+ # module. Don't do "locale" decisions here in the abstract layer of the
# search processor, just pass the value from user's choice unchanged to
# the engine request.
- if hasattr(self.engine, 'language') and self.engine.language:
- params['language'] = self.engine.language
+ if hasattr(self.engine, "language") and self.engine.language:
+ params["language"] = self.engine.language # pyright: ignore[reportGeneralTypeIssues]
else:
- params['language'] = search_query.lang
+ params["language"] = search_query.lang # pyright: ignore[reportGeneralTypeIssues]
return params
@abstractmethod
- def search(self, query, params, result_container, start_time, timeout_limit):
+ def search(
+ self,
+ query: str,
+ params: RequestParams,
+ result_container: "ResultContainer",
+ start_time: float,
+ timeout_limit: float,
+ ):
pass
def get_tests(self):
- tests = getattr(self.engine, 'tests', None)
- if tests is None:
- tests = getattr(self.engine, 'additional_tests', {})
- tests.update(self.get_default_tests())
- return tests
+ # deprecated!
+ return {}
def get_default_tests(self):
+ # deprecated!
return {}