diff options
Diffstat (limited to 'searx/plugins')
| -rw-r--r-- | searx/plugins/__init__.py | 292 | ||||
| -rw-r--r-- | searx/plugins/_core.py | 394 | ||||
| -rw-r--r-- | searx/plugins/ahmia_filter.py | 14 | ||||
| -rw-r--r-- | searx/plugins/calculator.py | 70 | ||||
| -rw-r--r-- | searx/plugins/hash_plugin.py | 97 | ||||
| -rw-r--r-- | searx/plugins/hostnames.py | 21 | ||||
| -rw-r--r-- | searx/plugins/oa_doi_rewrite.py | 10 | ||||
| -rw-r--r-- | searx/plugins/self_info.py | 75 | ||||
| -rw-r--r-- | searx/plugins/tor_check.py | 53 | ||||
| -rw-r--r-- | searx/plugins/tracker_url_remover.py | 18 | ||||
| -rw-r--r-- | searx/plugins/unit_converter.py | 30 |
11 files changed, 689 insertions, 385 deletions
diff --git a/searx/plugins/__init__.py b/searx/plugins/__init__.py index c3aad5f32..9aaf2f2db 100644 --- a/searx/plugins/__init__.py +++ b/searx/plugins/__init__.py @@ -1,232 +1,68 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -# pylint: disable=missing-module-docstring, missing-class-docstring - -import sys -from hashlib import sha256 -from importlib import import_module -from os import listdir, makedirs, remove, stat, utime -from os.path import abspath, basename, dirname, exists, join -from shutil import copyfile -from pkgutil import iter_modules -from logging import getLogger -from typing import List, Tuple - -from searx import logger, settings - - -class Plugin: # pylint: disable=too-few-public-methods - """This class is currently never initialized and only used for type hinting.""" - - id: str - name: str - description: str - default_on: bool - js_dependencies: Tuple[str] - css_dependencies: Tuple[str] - preference_section: str - - -logger = logger.getChild("plugins") - -required_attrs = ( - # fmt: off - ("name", str), - ("description", str), - ("default_on", bool) - # fmt: on -) - -optional_attrs = ( - # fmt: off - ("js_dependencies", tuple), - ("css_dependencies", tuple), - ("preference_section", str), - # fmt: on -) - - -def sha_sum(filename): - with open(filename, "rb") as f: - file_content_bytes = f.read() - return sha256(file_content_bytes).hexdigest() - - -def sync_resource(base_path, resource_path, name, target_dir, plugin_dir): - dep_path = join(base_path, resource_path) - file_name = basename(dep_path) - resource_path = join(target_dir, file_name) - if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path): - try: - copyfile(dep_path, resource_path) - # copy atime_ns and mtime_ns, so the weak ETags (generated by - # the HTTP server) do not change - dep_stat = stat(dep_path) - utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns)) - except IOError: - logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name)) - sys.exit(3) - - # returning with the web path of the resource - return join("plugins/external_plugins", plugin_dir, file_name) - - -def prepare_package_resources(plugin, plugin_module_name): - plugin_base_path = dirname(abspath(plugin.__file__)) - - plugin_dir = plugin_module_name - target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir) - try: - makedirs(target_dir, exist_ok=True) - except IOError: - logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name)) - sys.exit(3) - - resources = [] - - if hasattr(plugin, "js_dependencies"): - resources.extend(map(basename, plugin.js_dependencies)) - plugin.js_dependencies = [ - sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir) - for x in plugin.js_dependencies - ] - - if hasattr(plugin, "css_dependencies"): - resources.extend(map(basename, plugin.css_dependencies)) - plugin.css_dependencies = [ - sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir) - for x in plugin.css_dependencies - ] - - for f in listdir(target_dir): - if basename(f) not in resources: - resource_path = join(target_dir, basename(f)) - try: - remove(resource_path) - except IOError: - logger.critical( - "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name) - ) - sys.exit(3) - - -def load_plugin(plugin_module_name, external): - # pylint: disable=too-many-branches - try: - plugin = import_module(plugin_module_name) - except ( - SyntaxError, - KeyboardInterrupt, - SystemExit, - SystemError, - ImportError, - RuntimeError, - ) as e: - logger.critical("%s: fatal exception", plugin_module_name, exc_info=e) - sys.exit(3) - except BaseException: - logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name) - return None - - # difference with searx: use module name instead of the user name - plugin.id = plugin_module_name - - # - plugin.logger = getLogger(plugin_module_name) - - for plugin_attr, plugin_attr_type in required_attrs: - if not hasattr(plugin, plugin_attr): - logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr) - sys.exit(3) - attr = getattr(plugin, plugin_attr) - if not isinstance(attr, plugin_attr_type): - type_attr = str(type(attr)) - logger.critical( - '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format( - plugin, plugin_attr, type_attr, plugin_attr_type - ) - ) - sys.exit(3) - - for plugin_attr, plugin_attr_type in optional_attrs: - if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type): - setattr(plugin, plugin_attr, plugin_attr_type()) - - if not hasattr(plugin, "preference_section"): - plugin.preference_section = "general" - - # query plugin - if plugin.preference_section == "query": - for plugin_attr in ("query_keywords", "query_examples"): - if not hasattr(plugin, plugin_attr): - logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin)) - sys.exit(3) - - if settings.get("enabled_plugins"): - # searx compatibility: plugin.name in settings['enabled_plugins'] - plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"] - - # copy resources if this is an external plugin - if external: - prepare_package_resources(plugin, plugin_module_name) - - logger.debug("%s: loaded", plugin_module_name) - - return plugin - - -def load_and_initialize_plugin(plugin_module_name, external, init_args): - plugin = load_plugin(plugin_module_name, external) - if plugin and hasattr(plugin, 'init'): - try: - return plugin if plugin.init(*init_args) else None - except Exception: # pylint: disable=broad-except - plugin.logger.exception("Exception while calling init, the plugin is disabled") - return None - return plugin - - -class PluginStore: - def __init__(self): - self.plugins: List[Plugin] = [] - - def __iter__(self): - yield from self.plugins - - def register(self, plugin): - self.plugins.append(plugin) - - def call(self, ordered_plugin_list, plugin_type, *args, **kwargs): - ret = True - for plugin in ordered_plugin_list: - if hasattr(plugin, plugin_type): - try: - ret = getattr(plugin, plugin_type)(*args, **kwargs) - if not ret: - break - except Exception: # pylint: disable=broad-except - plugin.logger.exception("Exception while calling %s", plugin_type) - return ret - - -plugins = PluginStore() - - -def plugin_module_names(): - yield_plugins = set() - - # embedded plugins - for module in iter_modules(path=[dirname(__file__)]): - yield (__name__ + "." + module.name, False) - yield_plugins.add(module.name) - # external plugins - for module_name in settings['plugins']: - if module_name not in yield_plugins: - yield (module_name, True) - yield_plugins.add(module_name) +""".. sidebar:: Further reading .. + + - :ref:`plugins admin` + - :ref:`SearXNG settings <settings plugins>` + - :ref:`builtin plugins` + +Plugins can extend or replace functionality of various components of SearXNG. +Here is an example of a very simple plugin that adds a "Hello" into the answer +area: + +.. code:: python + + from flask_babel import gettext as _ + from searx.plugins import Plugin + from searx.result_types import Answer + + class MyPlugin(Plugin): + + id = "self_info" + default_on = True + + def __init__(self): + super().__init__() + info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin")) + + def post_search(self, request, search): + return [ Answer(answer="Hello") ] + +Entry points (hooks) define when a plugin runs. Right now only three hooks are +implemented. So feel free to implement a hook if it fits the behaviour of your +plugin / a plugin doesn't need to implement all the hooks. + +- pre search: :py:obj:`Plugin.pre_search` +- post search: :py:obj:`Plugin.post_search` +- on each result item: :py:obj:`Plugin.on_result` + +For a coding example have a look at :ref:`self_info plugin`. + +---- + +.. autoclass:: Plugin + :members: + +.. autoclass:: PluginInfo + :members: + +.. autoclass:: PluginStorage + :members: + +.. autoclass:: searx.plugins._core.ModulePlugin + :members: + :show-inheritance: + +""" + +from __future__ import annotations + +__all__ = ["PluginInfo", "Plugin", "PluginStorage"] + +from ._core import PluginInfo, Plugin, PluginStorage + +STORAGE: PluginStorage = PluginStorage() def initialize(app): - for module_name, external in plugin_module_names(): - plugin = load_and_initialize_plugin(module_name, external, (app, settings)) - if plugin: - plugins.register(plugin) + STORAGE.load_builtins() + STORAGE.init(app) diff --git a/searx/plugins/_core.py b/searx/plugins/_core.py new file mode 100644 index 000000000..70e5758ec --- /dev/null +++ b/searx/plugins/_core.py @@ -0,0 +1,394 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# pylint: disable=too-few-public-methods,missing-module-docstring + +from __future__ import annotations + +__all__ = ["PluginInfo", "Plugin", "PluginStorage"] + +import abc +import importlib +import logging +import pathlib +import types +import typing +import warnings + +from dataclasses import dataclass, field + +import flask + +import searx +from searx.utils import load_module +from searx.extended_types import SXNG_Request +from searx.result_types import Result + + +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + + +_default = pathlib.Path(__file__).parent +log: logging.Logger = logging.getLogger("searx.plugins") + + +@dataclass +class PluginInfo: + """Object that holds informations about a *plugin*, these infos are shown to + the user in the Preferences menu. + + To be able to translate the information into other languages, the text must + be written in English and translated with :py:obj:`flask_babel.gettext`. + """ + + id: str + """The ID-selector in HTML/CSS `#<id>`.""" + + name: str + """Name of the *plugin*.""" + + description: str + """Short description of the *answerer*.""" + + preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general" + """Section (tab/group) in the preferences where this plugin is shown to the + user. + + The value ``query`` is reserved for plugins that are activated via a + *keyword* as part of a search query, see: + + - :py:obj:`PluginInfo.examples` + - :py:obj:`Plugin.keywords` + + Those plugins are shown in the preferences in tab *Special Queries*. + """ + + examples: list[str] = field(default_factory=list) + """List of short examples of the usage / of query terms.""" + + keywords: list[str] = field(default_factory=list) + """See :py:obj:`Plugin.keywords`""" + + +class Plugin(abc.ABC): + """Abstract base class of all Plugins.""" + + id: typing.ClassVar[str] + """The ID (suffix) in the HTML form.""" + + default_on: typing.ClassVar[bool] + """Plugin is enabled/disabled by default.""" + + keywords: list[str] = [] + """Keywords in the search query that activate the plugin. The *keyword* is + the first word in a search query. If a plugin should be executed regardless + of the search query, the list of keywords should be empty (which is also the + default in the base class for Plugins).""" + + log: logging.Logger + """A logger object, is automatically initialized when calling the + constructor (if not already set in the subclass).""" + + info: PluginInfo + """Informations about the *plugin*, see :py:obj:`PluginInfo`.""" + + def __init__(self) -> None: + super().__init__() + + for attr in ["id", "default_on"]: + if getattr(self, attr, None) is None: + raise NotImplementedError(f"plugin {self} is missing attribute {attr}") + + if not self.id: + self.id = f"{self.__class__.__module__}.{self.__class__.__name__}" + if not getattr(self, "log", None): + self.log = log.getChild(self.id) + + def __hash__(self) -> int: + """The hash value is used in :py:obj:`set`, for example, when an object + is added to the set. The hash value is also used in other contexts, + e.g. when checking for equality to identify identical plugins from + different sources (name collisions).""" + + return id(self) + + def __eq__(self, other): + """py:obj:`Plugin` objects are equal if the hash values of the two + objects are equal.""" + + return hash(self) == hash(other) + + def init(self, app: flask.Flask) -> bool: # pylint: disable=unused-argument + """Initialization of the plugin, the return value decides whether this + plugin is active or not. Initialization only takes place once, at the + time the WEB application is set up. The base methode always returns + ``True``, the methode can be overwritten in the inheritances, + + - ``True`` plugin is active + - ``False`` plugin is inactive + """ + return True + + # pylint: disable=unused-argument + def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: + """Runs BEFORE the search request and returns a boolean: + + - ``True`` to continue the search + - ``False`` to stop the search + """ + return True + + def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: + """Runs for each result of each engine and returns a boolean: + + - ``True`` to keep the result + - ``False`` to remove the result from the result list + + The ``result`` can be modified to the needs. + + .. hint:: + + If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must + be changed accordingly: + + .. code:: python + + result["parsed_url"] = urlparse(result["url"]) + """ + return True + + def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]: + """Runs AFTER the search request. Can return a list of :py:obj:`Result` + objects to be added to the final result list.""" + return + + +class ModulePlugin(Plugin): + """A wrapper class for legacy *plugins*. + + .. note:: + + For internal use only! + + In a module plugin, the follwing names are mapped: + + - `module.query_keywords` --> :py:obj:`Plugin.keywords` + - `module.plugin_id` --> :py:obj:`Plugin.id` + - `module.logger` --> :py:obj:`Plugin.log` + """ + + _required_attrs = (("name", str), ("description", str), ("default_on", bool)) + + def __init__(self, mod: types.ModuleType): + """In case of missing attributes in the module or wrong types are given, + a :py:obj:`TypeError` exception is raised.""" + + self.module = mod + self.id = getattr(self.module, "plugin_id", self.module.__name__) + self.log = logging.getLogger(self.module.__name__) + self.keywords = getattr(self.module, "query_keywords", []) + + for attr, attr_type in self._required_attrs: + if not hasattr(self.module, attr): + msg = f"missing attribute {attr}, cannot load plugin" + self.log.critical(msg) + raise TypeError(msg) + if not isinstance(getattr(self.module, attr), attr_type): + msg = f"attribute {attr} is not of type {attr_type}" + self.log.critical(msg) + raise TypeError(msg) + + self.default_on = mod.default_on + self.info = PluginInfo( + id=self.id, + name=self.module.name, + description=self.module.description, + preference_section=getattr(self.module, "preference_section", None), + examples=getattr(self.module, "query_examples", []), + keywords=self.keywords, + ) + + # monkeypatch module + self.module.logger = self.log # type: ignore + + super().__init__() + + def init(self, app: flask.Flask) -> bool: + if not hasattr(self.module, "init"): + return True + return self.module.init(app) + + def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: + if not hasattr(self.module, "pre_search"): + return True + return self.module.pre_search(request, search) + + def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: + if not hasattr(self.module, "on_result"): + return True + return self.module.on_result(request, search, result) + + def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]: + if not hasattr(self.module, "post_search"): + return None + return self.module.post_search(request, search) + + +class PluginStorage: + """A storage for managing the *plugins* of SearXNG.""" + + plugin_list: set[Plugin] + """The list of :py:obj:`Plugins` in this storage.""" + + legacy_plugins = [ + "ahmia_filter", + "calculator", + "hostnames", + "oa_doi_rewrite", + "tor_check", + "tracker_url_remover", + "unit_converter", + ] + """Internal plugins implemented in the legacy style (as module / deprecated!).""" + + def __init__(self): + self.plugin_list = set() + + def __iter__(self): + + yield from self.plugin_list + + def __len__(self): + return len(self.plugin_list) + + @property + def info(self) -> list[PluginInfo]: + return [p.info for p in self.plugin_list] + + def load_builtins(self): + """Load plugin modules from: + + - the python packages in :origin:`searx/plugins` and + - the external plugins from :ref:`settings plugins`. + """ + + for f in _default.iterdir(): + + if f.name.startswith("_"): + continue + + if f.stem not in self.legacy_plugins: + self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin") + continue + + # for backward compatibility + mod = load_module(f.name, str(f.parent)) + self.register(ModulePlugin(mod)) + + for fqn in searx.get_setting("plugins"): # type: ignore + self.register_by_fqn(fqn) + + def register(self, plugin: Plugin): + """Register a :py:obj:`Plugin`. In case of name collision (if two + plugins have same ID) a :py:obj:`KeyError` exception is raised. + """ + + if plugin in self.plugin_list: + msg = f"name collision '{plugin.id}'" + plugin.log.critical(msg) + raise KeyError(msg) + + self.plugin_list.add(plugin) + plugin.log.debug("plugin has been loaded") + + def register_by_fqn(self, fqn: str): + """Register a :py:obj:`Plugin` via its fully qualified class name (FQN). + The FQNs of external plugins could be read from a configuration, for + example, and registered using this method + """ + + mod_name, _, obj_name = fqn.rpartition('.') + if not mod_name: + # for backward compatibility + code_obj = importlib.import_module(fqn) + else: + mod = importlib.import_module(mod_name) + code_obj = getattr(mod, obj_name, None) + + if code_obj is None: + msg = f"plugin {fqn} is not implemented" + log.critical(msg) + raise ValueError(msg) + + if isinstance(code_obj, types.ModuleType): + # for backward compatibility + warnings.warn( + f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning + ) + self.register(ModulePlugin(code_obj)) + return + + self.register(code_obj()) + + def init(self, app: flask.Flask) -> None: + """Calls the method :py:obj:`Plugin.init` of each plugin in this + storage. Depending on its return value, the plugin is removed from + *this* storage or not.""" + + for plg in self.plugin_list.copy(): + if not plg.init(app): + self.plugin_list.remove(plg) + + def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: + + ret = True + for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: + try: + ret = bool(plugin.pre_search(request=request, search=search)) + except Exception: # pylint: disable=broad-except + plugin.log.exception("Exception while calling pre_search") + continue + if not ret: + # skip this search on the first False from a plugin + break + return ret + + def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: + + ret = True + for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: + try: + ret = bool(plugin.on_result(request=request, search=search, result=result)) + except Exception: # pylint: disable=broad-except + plugin.log.exception("Exception while calling on_result") + continue + if not ret: + # ignore this result item on the first False from a plugin + break + + return ret + + def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None: + """Extend :py:obj:`search.result_container + <searx.results.ResultContainer`> with result items from plugins listed + in :py:obj:`search.user_plugins <SearchWithPlugins.user_plugins>`. + """ + + keyword = None + for keyword in search.search_query.query.split(): + if keyword: + break + + for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: + + if plugin.keywords: + # plugin with keywords: skip plugin if no keyword match + if keyword and keyword not in plugin.keywords: + continue + try: + results = plugin.post_search(request=request, search=search) or [] + except Exception: # pylint: disable=broad-except + plugin.log.exception("Exception while calling post_search") + continue + + # In case of *plugins* prefix ``plugin:`` is set, see searx.result_types.Result + search.result_container.extend(f"plugin: {plugin.id}", results) diff --git a/searx/plugins/ahmia_filter.py b/searx/plugins/ahmia_filter.py index bbf137103..c1252b147 100644 --- a/searx/plugins/ahmia_filter.py +++ b/searx/plugins/ahmia_filter.py @@ -1,27 +1,33 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring +from __future__ import annotations from hashlib import md5 + +import flask + from searx.data import ahmia_blacklist_loader +from searx import get_setting + name = "Ahmia blacklist" description = "Filter out onion results that appear in Ahmia's blacklist. (See https://ahmia.fi/blacklist)" default_on = True preference_section = 'onions' -ahmia_blacklist = None +ahmia_blacklist: list = [] -def on_result(_request, _search, result): +def on_result(_request, _search, result) -> bool: if not result.get('is_onion') or not result.get('parsed_url'): return True result_hash = md5(result['parsed_url'].hostname.encode()).hexdigest() return result_hash not in ahmia_blacklist -def init(_app, settings): +def init(app=flask.Flask) -> bool: # pylint: disable=unused-argument global ahmia_blacklist # pylint: disable=global-statement - if not settings['outgoing']['using_tor_proxy']: + if not get_setting("outgoing.using_tor_proxy"): # disable the plugin return False ahmia_blacklist = ahmia_blacklist_loader() diff --git a/searx/plugins/calculator.py b/searx/plugins/calculator.py index e92ff9d91..162f3c3c6 100644 --- a/searx/plugins/calculator.py +++ b/searx/plugins/calculator.py @@ -1,28 +1,27 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -"""Calculate mathematical expressions using ack#eval +"""Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval"). """ +from __future__ import annotations +from typing import Callable + import ast import re import operator -from multiprocessing import Process, Queue -from typing import Callable +import multiprocessing -import flask import babel +import babel.numbers from flask_babel import gettext -from searx.plugins import logger +from searx.result_types import Answer name = "Basic Calculator" description = gettext("Calculate mathematical expressions via the search bar") default_on = True - preference_section = 'general' plugin_id = 'calculator' -logger = logger.getChild(plugin_id) - operators: dict[type, Callable] = { ast.Add: operator.add, ast.Sub: operator.sub, @@ -33,11 +32,17 @@ operators: dict[type, Callable] = { ast.USub: operator.neg, } +# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating +# the old behavior "fork") but it will not solve the core problem of fork, nor +# will it remove the deprecation warnings in py3.12 & py3.13. Issue is +# ddiscussed here: https://github.com/searxng/searxng/issues/4159 +mp_fork = multiprocessing.get_context("fork") + def _eval_expr(expr): """ >>> _eval_expr('2^6') - 4 + 64 >>> _eval_expr('2**6') 64 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)') @@ -63,46 +68,49 @@ def _eval(node): raise TypeError(node) -def timeout_func(timeout, func, *args, **kwargs): +def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name + try: + q.put(func(*args, **kwargs)) + except: + q.put(None) + raise + - def handler(q: Queue, func, args, **kwargs): # pylint:disable=invalid-name - try: - q.put(func(*args, **kwargs)) - except: - q.put(None) - raise +def timeout_func(timeout, func, *args, **kwargs): - que = Queue() - p = Process(target=handler, args=(que, func, args), kwargs=kwargs) + que = mp_fork.Queue() + p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs) p.start() p.join(timeout=timeout) ret_val = None + # pylint: disable=used-before-assignment,undefined-variable if not p.is_alive(): ret_val = que.get() else: - logger.debug("terminate function after timeout is exceeded") + logger.debug("terminate function after timeout is exceeded") # type: ignore p.terminate() p.join() p.close() return ret_val -def post_search(_request, search): +def post_search(request, search) -> list[Answer]: + results = [] # only show the result of the expression on the first page if search.search_query.pageno > 1: - return True + return results query = search.search_query.query # in order to avoid DoS attacks with long expressions, ignore long expressions if len(query) > 100: - return True + return results # replace commonly used math operators with their proper Python operator query = query.replace("x", "*").replace(":", "/") # use UI language - ui_locale = babel.Locale.parse(flask.request.preferences.get_value('locale'), sep='-') + ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-') # parse the number system in a localized way def _decimal(match: re.Match) -> str: @@ -116,15 +124,17 @@ def post_search(_request, search): # only numbers and math operators are accepted if any(str.isalpha(c) for c in query): - return True + return results # in python, powers are calculated via ** query_py_formatted = query.replace("^", "**") # Prevent the runtime from being longer than 50 ms - result = timeout_func(0.05, _eval_expr, query_py_formatted) - if result is None or result == "": - return True - result = babel.numbers.format_decimal(result, locale=ui_locale) - search.result_container.answers['calculate'] = {'answer': f"{search.search_query.query} = {result}"} - return True + res = timeout_func(0.05, _eval_expr, query_py_formatted) + if res is None or res == "": + return results + + res = babel.numbers.format_decimal(res, locale=ui_locale) + Answer(results=results, answer=f"{search.search_query.query} = {res}") + + return results diff --git a/searx/plugins/hash_plugin.py b/searx/plugins/hash_plugin.py index c27e2a432..9db3748b3 100644 --- a/searx/plugins/hash_plugin.py +++ b/searx/plugins/hash_plugin.py @@ -1,43 +1,66 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -# pylint: disable=missing-module-docstring +# pylint: disable=missing-module-docstring, missing-class-docstring +from __future__ import annotations +import typing -import hashlib import re +import hashlib from flask_babel import gettext -name = "Hash plugin" -description = gettext("Converts strings to different hash digests.") -default_on = True -preference_section = 'query' -query_keywords = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'] -query_examples = 'sha512 The quick brown fox jumps over the lazy dog' - -parser_re = re.compile('(md5|sha1|sha224|sha256|sha384|sha512) (.*)', re.I) - - -def post_search(_request, search): - # process only on first page - if search.search_query.pageno > 1: - return True - m = parser_re.match(search.search_query.query) - if not m: - # wrong query - return True - - function, string = m.groups() - if not string.strip(): - # end if the string is empty - return True - - # select hash function - f = hashlib.new(function.lower()) - - # make digest from the given string - f.update(string.encode('utf-8').strip()) - answer = function + " " + gettext('hash digest') + ": " + f.hexdigest() - - # print result - search.result_container.answers.clear() - search.result_container.answers['hash'] = {'answer': answer} - return True +from searx.plugins import Plugin, PluginInfo +from searx.result_types import Answer + +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + + +class SXNGPlugin(Plugin): + """Plugin converts strings to different hash digests. The results are + displayed in area for the "answers". + """ + + id = "hash_plugin" + default_on = True + keywords = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"] + + def __init__(self): + super().__init__() + + self.parser_re = re.compile(f"({'|'.join(self.keywords)}) (.*)", re.I) + self.info = PluginInfo( + id=self.id, + name=gettext("Hash plugin"), + description=gettext("Converts strings to different hash digests."), + examples=["sha512 The quick brown fox jumps over the lazy dog"], + preference_section="query", + ) + + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]: + """Returns a result list only for the first page.""" + results = [] + + if search.search_query.pageno > 1: + return results + + m = self.parser_re.match(search.search_query.query) + if not m: + # wrong query + return results + + function, string = m.groups() + if not string.strip(): + # end if the string is empty + return results + + # select hash function + f = hashlib.new(function.lower()) + + # make digest from the given string + f.update(string.encode("utf-8").strip()) + answer = function + " " + gettext("hash digest") + ": " + f.hexdigest() + + Answer(results=results, answer=answer) + + return results diff --git a/searx/plugins/hostnames.py b/searx/plugins/hostnames.py index 6519452db..f2d829103 100644 --- a/searx/plugins/hostnames.py +++ b/searx/plugins/hostnames.py @@ -91,15 +91,17 @@ something like this: """ +from __future__ import annotations + import re from urllib.parse import urlunparse, urlparse from flask_babel import gettext from searx import settings -from searx.plugins import logger from searx.settings_loader import get_yaml_cfg + name = gettext('Hostnames plugin') description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname') default_on = False @@ -107,16 +109,15 @@ preference_section = 'general' plugin_id = 'hostnames' -logger = logger.getChild(plugin_id) parsed = 'parsed_url' _url_fields = ['iframe_src', 'audio_src'] -def _load_regular_expressions(settings_key): +def _load_regular_expressions(settings_key) -> dict | set | None: setting_value = settings.get(plugin_id, {}).get(settings_key) if not setting_value: - return {} + return None # load external file with configuration if isinstance(setting_value, str): @@ -128,20 +129,20 @@ def _load_regular_expressions(settings_key): if isinstance(setting_value, dict): return {re.compile(p): r for (p, r) in setting_value.items()} - return {} + return None -replacements = _load_regular_expressions('replace') -removables = _load_regular_expressions('remove') -high_priority = _load_regular_expressions('high_priority') -low_priority = _load_regular_expressions('low_priority') +replacements: dict = _load_regular_expressions('replace') or {} # type: ignore +removables: set = _load_regular_expressions('remove') or set() # type: ignore +high_priority: set = _load_regular_expressions('high_priority') or set() # type: ignore +low_priority: set = _load_regular_expressions('low_priority') or set() # type: ignore def _matches_parsed_url(result, pattern): return parsed in result and pattern.search(result[parsed].netloc) -def on_result(_request, _search, result): +def on_result(_request, _search, result) -> bool: for pattern, replacement in replacements.items(): if _matches_parsed_url(result, pattern): # logger.debug(result['url']) diff --git a/searx/plugins/oa_doi_rewrite.py b/searx/plugins/oa_doi_rewrite.py index 6a214282c..be5a8d4a4 100644 --- a/searx/plugins/oa_doi_rewrite.py +++ b/searx/plugins/oa_doi_rewrite.py @@ -1,18 +1,21 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring +from __future__ import annotations import re from urllib.parse import urlparse, parse_qsl from flask_babel import gettext + from searx import settings + regex = re.compile(r'10\.\d{4,9}/[^\s]+') name = gettext('Open Access DOI rewrite') description = gettext('Avoid paywalls by redirecting to open-access versions of publications when available') default_on = False -preference_section = 'general' +preference_section = 'general/doi_resolver' def extract_doi(url): @@ -34,8 +37,9 @@ def get_doi_resolver(preferences): return doi_resolvers[selected_resolver] -def on_result(request, _search, result): - if 'parsed_url' not in result: +def on_result(request, _search, result) -> bool: + + if not result.parsed_url: return True doi = extract_doi(result['parsed_url']) diff --git a/searx/plugins/self_info.py b/searx/plugins/self_info.py index 7cad040d2..d1d7f12b9 100644 --- a/searx/plugins/self_info.py +++ b/searx/plugins/self_info.py @@ -1,32 +1,57 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -# pylint: disable=missing-module-docstring,invalid-name +# pylint: disable=missing-module-docstring, missing-class-docstring +from __future__ import annotations +import typing import re from flask_babel import gettext from searx.botdetection._helpers import get_real_ip +from searx.result_types import Answer -name = gettext('Self Information') -description = gettext('Displays your IP if the query is "ip" and your user agent if the query contains "user agent".') -default_on = True -preference_section = 'query' -query_keywords = ['user-agent'] -query_examples = '' - -# "ip" or "my ip" regex -ip_regex = re.compile('^ip$|my ip', re.IGNORECASE) - -# Self User Agent regex -ua_regex = re.compile('.*user[ -]agent.*', re.IGNORECASE) - - -def post_search(request, search): - if search.search_query.pageno > 1: - return True - if ip_regex.search(search.search_query.query): - ip = get_real_ip(request) - search.result_container.answers['ip'] = {'answer': gettext('Your IP is: ') + ip} - elif ua_regex.match(search.search_query.query): - ua = request.user_agent - search.result_container.answers['user-agent'] = {'answer': gettext('Your user-agent is: ') + ua.string} - return True +from . import Plugin, PluginInfo + +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + + +class SXNGPlugin(Plugin): + """Simple plugin that displays information about user's request, including + the IP or HTTP User-Agent. The information is displayed in area for the + "answers". + """ + + id = "self_info" + default_on = True + keywords = ["ip", "user-agent"] + + def __init__(self): + super().__init__() + + self.ip_regex = re.compile(r"^ip", re.IGNORECASE) + self.ua_regex = re.compile(r"^user-agent", re.IGNORECASE) + + self.info = PluginInfo( + id=self.id, + name=gettext("Self Information"), + description=gettext( + """Displays your IP if the query is "ip" and your user agent if the query is "user-agent".""" + ), + preference_section="query", + ) + + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]: + """Returns a result list only for the first page.""" + results = [] + + if search.search_query.pageno > 1: + return results + + if self.ip_regex.search(search.search_query.query): + Answer(results=results, answer=gettext("Your IP is: ") + get_real_ip(request)) + + if self.ua_regex.match(search.search_query.query): + Answer(results=results, answer=gettext("Your user-agent is: ") + str(request.user_agent)) + + return results diff --git a/searx/plugins/tor_check.py b/searx/plugins/tor_check.py index 3816d8ece..95281eb42 100644 --- a/searx/plugins/tor_check.py +++ b/searx/plugins/tor_check.py @@ -1,8 +1,8 @@ # SPDX-License-Identifier: AGPL-3.0-or-later """A plugin to check if the ip address of the request is a Tor exit-node if the user searches for ``tor-check``. It fetches the tor exit node list from -https://check.torproject.org/exit-addresses and parses all the IPs into a list, -then checks if the user's IP address is in it. +:py:obj:`url_exit_list` and parses all the IPs into a list, then checks if the +user's IP address is in it. Enable in ``settings.yml``: @@ -14,10 +14,15 @@ Enable in ``settings.yml``: """ +from __future__ import annotations + import re from flask_babel import gettext from httpx import HTTPError + from searx.network import get +from searx.result_types import Answer + default_on = False @@ -42,27 +47,28 @@ query_examples = '' # Regex for exit node addresses in the list. reg = re.compile(r"(?<=ExitAddress )\S+") +url_exit_list = "https://check.torproject.org/exit-addresses" +"""URL to load Tor exit list from.""" -def post_search(request, search): + +def post_search(request, search) -> list[Answer]: + results = [] if search.search_query.pageno > 1: - return True + return results if search.search_query.query.lower() == "tor-check": # Request the list of tor exit nodes. try: - resp = get("https://check.torproject.org/exit-addresses") - node_list = re.findall(reg, resp.text) + resp = get(url_exit_list) + node_list = re.findall(reg, resp.text) # type: ignore except HTTPError: # No answer, return error - search.result_container.answers["tor"] = { - "answer": gettext( - "Could not download the list of Tor exit-nodes from: https://check.torproject.org/exit-addresses" - ) - } - return True + msg = gettext("Could not download the list of Tor exit-nodes from") + Answer(results=results, answer=f"{msg} {url_exit_list}") + return results x_forwarded_for = request.headers.getlist("X-Forwarded-For") @@ -72,20 +78,11 @@ def post_search(request, search): ip_address = request.remote_addr if ip_address in node_list: - search.result_container.answers["tor"] = { - "answer": gettext( - "You are using Tor and it looks like you have this external IP address: {ip_address}".format( - ip_address=ip_address - ) - ) - } + msg = gettext("You are using Tor and it looks like you have the external IP address") + Answer(results=results, answer=f"{msg} {ip_address}") + else: - search.result_container.answers["tor"] = { - "answer": gettext( - "You are not using Tor and you have this external IP address: {ip_address}".format( - ip_address=ip_address - ) - ) - } - - return True + msg = gettext("You are not using Tor and you have the external IP address") + Answer(results=results, answer=f"{msg} {ip_address}") + + return results diff --git a/searx/plugins/tracker_url_remover.py b/searx/plugins/tracker_url_remover.py index 2961cd026..f33f7fdfd 100644 --- a/searx/plugins/tracker_url_remover.py +++ b/searx/plugins/tracker_url_remover.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring +from __future__ import annotations + import re from urllib.parse import urlunparse, parse_qsl, urlencode @@ -19,24 +21,24 @@ default_on = True preference_section = 'privacy' -def on_result(_request, _search, result): - if 'parsed_url' not in result: - return True +def on_result(_request, _search, result) -> bool: - query = result['parsed_url'].query + parsed_url = getattr(result, "parsed_url", None) + if not parsed_url: + return True - if query == "": + if parsed_url.query == "": return True - parsed_query = parse_qsl(query) + parsed_query = parse_qsl(parsed_url.query) changes = 0 for i, (param_name, _) in enumerate(list(parsed_query)): for reg in regexes: if reg.match(param_name): parsed_query.pop(i - changes) changes += 1 - result['parsed_url'] = result['parsed_url']._replace(query=urlencode(parsed_query)) - result['url'] = urlunparse(result['parsed_url']) + result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query)) + result.url = urlunparse(result.parsed_url) break return True diff --git a/searx/plugins/unit_converter.py b/searx/plugins/unit_converter.py index c85996a76..cdd8287fe 100644 --- a/searx/plugins/unit_converter.py +++ b/searx/plugins/unit_converter.py @@ -18,11 +18,14 @@ Enable in ``settings.yml``: """ +from __future__ import annotations import re import babel.numbers + from flask_babel import gettext, get_locale from searx import data +from searx.result_types import Answer name = "Unit converter plugin" @@ -171,16 +174,16 @@ def symbol_to_si(): return SYMBOL_TO_SI -def _parse_text_and_convert(search, from_query, to_query): +def _parse_text_and_convert(from_query, to_query) -> str | None: # pylint: disable=too-many-branches, too-many-locals if not (from_query and to_query): - return + return None measured = re.match(RE_MEASURE, from_query, re.VERBOSE) if not (measured and measured.group('number'), measured.group('unit')): - return + return None # Symbols are not unique, if there are several hits for the from-unit, then # the correct one must be determined by comparing it with the to-unit @@ -198,7 +201,7 @@ def _parse_text_and_convert(search, from_query, to_query): target_list.append((si_name, from_si, orig_symbol)) if not (source_list and target_list): - return + return None source_to_si = target_from_si = target_symbol = None @@ -212,7 +215,7 @@ def _parse_text_and_convert(search, from_query, to_query): target_symbol = target[2] if not (source_to_si and target_from_si): - return + return None _locale = get_locale() or 'en_US' @@ -239,25 +242,28 @@ def _parse_text_and_convert(search, from_query, to_query): else: result = babel.numbers.format_decimal(value, locale=_locale, format='#,##0.##########;-#') - search.result_container.answers['conversion'] = {'answer': f'{result} {target_symbol}'} + return f'{result} {target_symbol}' + +def post_search(_request, search) -> list[Answer]: + results = [] -def post_search(_request, search): # only convert between units on the first page if search.search_query.pageno > 1: - return True + return results query = search.search_query.query query_parts = query.split(" ") if len(query_parts) < 3: - return True + return results for query_part in query_parts: for keyword in CONVERT_KEYWORDS: if query_part == keyword: from_query, to_query = query.split(keyword, 1) - _parse_text_and_convert(search, from_query.strip(), to_query.strip()) - return True + target_val = _parse_text_and_convert(from_query.strip(), to_query.strip()) + if target_val: + Answer(results=results, answer=target_val) - return True + return results |