diff options
Diffstat (limited to 'searx/plugins')
| -rw-r--r-- | searx/plugins/__init__.py | 88 | ||||
| -rw-r--r-- | searx/plugins/_core.py | 237 | ||||
| -rw-r--r-- | searx/plugins/ahmia_filter.py | 57 | ||||
| -rw-r--r-- | searx/plugins/calculator.py | 126 | ||||
| -rw-r--r-- | searx/plugins/hash_plugin.py | 6 | ||||
| -rw-r--r-- | searx/plugins/hostnames.py | 156 | ||||
| -rw-r--r-- | searx/plugins/oa_doi_rewrite.py | 106 | ||||
| -rw-r--r-- | searx/plugins/self_info.py | 6 | ||||
| -rw-r--r-- | searx/plugins/tor_check.py | 95 | ||||
| -rw-r--r-- | searx/plugins/tracker_url_remover.py | 62 | ||||
| -rw-r--r-- | searx/plugins/unit_converter.py | 92 |
11 files changed, 533 insertions, 498 deletions
diff --git a/searx/plugins/__init__.py b/searx/plugins/__init__.py index 9aaf2f2db..4eacf52bd 100644 --- a/searx/plugins/__init__.py +++ b/searx/plugins/__init__.py @@ -3,11 +3,29 @@ - :ref:`plugins admin` - :ref:`SearXNG settings <settings plugins>` - - :ref:`builtin plugins` Plugins can extend or replace functionality of various components of SearXNG. -Here is an example of a very simple plugin that adds a "Hello" into the answer -area: + +Entry points (hooks) define when a plugin runs. Right now only three hooks are +implemented. So feel free to implement a hook if it fits the behaviour of your +plugin / a plugin doesn't need to implement all the hooks. + +- pre search: :py:obj:`Plugin.pre_search` +- post search: :py:obj:`Plugin.post_search` +- on each result item: :py:obj:`Plugin.on_result` + +Below you will find some examples, for more coding examples have a look at the +built-in plugins :origin:`searx/plugins/` or `Only show green hosted results`_. + +.. _Only show green hosted results: + https://github.com/return42/tgwf-searx-plugins/ + + +Add Answer example +================== + +Here is an example of a very simple plugin that adds a "Hello World" into the +answer area: .. code:: python @@ -17,27 +35,51 @@ area: class MyPlugin(Plugin): - id = "self_info" - default_on = True + id = "hello world" - def __init__(self): - super().__init__() - info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin")) + def __init__(self, plg_cfg): + super().__init__(plg_cfg) + self.info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin")) def post_search(self, request, search): - return [ Answer(answer="Hello") ] + return [ Answer(answer="Hello World") ] -Entry points (hooks) define when a plugin runs. Right now only three hooks are -implemented. So feel free to implement a hook if it fits the behaviour of your -plugin / a plugin doesn't need to implement all the hooks. +.. _filter urls example: -- pre search: :py:obj:`Plugin.pre_search` -- post search: :py:obj:`Plugin.post_search` -- on each result item: :py:obj:`Plugin.on_result` +Filter URLs example +=================== + +.. sidebar:: Further reading .. + + - :py:obj:`Result.filter_urls(..) <searx.result_types._base.Result.filter_urls>` + +The :py:obj:`Result.filter_urls(..) <searx.result_types._base.Result.filter_urls>` +can be used to filter and/or modify URL fields. In the following example, the +filter function ``my_url_filter``: + +.. code:: python -For a coding example have a look at :ref:`self_info plugin`. + def my_url_filter(result, field_name, url_src) -> bool | str: + if "google" in url_src: + return False # remove URL field from result + if "facebook" in url_src: + new_url = url_src.replace("facebook", "fb-dummy") + return new_url # return modified URL + return True # leave URL in field unchanged ----- +is applied to all URL fields in the :py:obj:`Plugin.on_result` hook: + +.. code:: python + + class MyUrlFilter(Plugin): + ... + def on_result(self, request, search, result) -> bool: + result.filter_urls(my_url_filter) + return True + + +Implementation +============== .. autoclass:: Plugin :members: @@ -48,21 +90,21 @@ For a coding example have a look at :ref:`self_info plugin`. .. autoclass:: PluginStorage :members: -.. autoclass:: searx.plugins._core.ModulePlugin +.. autoclass:: PluginCfg :members: - :show-inheritance: - """ from __future__ import annotations -__all__ = ["PluginInfo", "Plugin", "PluginStorage"] +__all__ = ["PluginInfo", "Plugin", "PluginStorage", "PluginCfg"] + -from ._core import PluginInfo, Plugin, PluginStorage +import searx +from ._core import PluginInfo, Plugin, PluginStorage, PluginCfg STORAGE: PluginStorage = PluginStorage() def initialize(app): - STORAGE.load_builtins() + STORAGE.load_settings(searx.get_setting("plugins")) STORAGE.init(app) diff --git a/searx/plugins/_core.py b/searx/plugins/_core.py index 7df9772e9..f5ae56e15 100644 --- a/searx/plugins/_core.py +++ b/searx/plugins/_core.py @@ -3,31 +3,24 @@ from __future__ import annotations -__all__ = ["PluginInfo", "Plugin", "PluginStorage"] +__all__ = ["PluginInfo", "Plugin", "PluginCfg", "PluginStorage"] import abc import importlib +import inspect import logging -import pathlib -import types +import re import typing -import warnings from dataclasses import dataclass, field -import flask - -import searx -from searx.utils import load_module from searx.extended_types import SXNG_Request from searx.result_types import Result - if typing.TYPE_CHECKING: from searx.search import SearchWithPlugins + import flask - -_default = pathlib.Path(__file__).parent log: logging.Logger = logging.getLogger("searx.plugins") @@ -69,14 +62,17 @@ class PluginInfo: """See :py:obj:`Plugin.keywords`""" +ID_REGXP = re.compile("[a-z][a-z0-9].*") + + class Plugin(abc.ABC): """Abstract base class of all Plugins.""" id: str = "" """The ID (suffix) in the HTML form.""" - default_on: bool = False - """Plugin is enabled/disabled by default.""" + active: typing.ClassVar[bool] + """Plugin is enabled/disabled by default (:py:obj:`PluginCfg.active`).""" keywords: list[str] = [] """Keywords in the search query that activate the plugin. The *keyword* is @@ -93,19 +89,28 @@ class Plugin(abc.ABC): fqn: str = "" - def __init__(self) -> None: + def __init__(self, plg_cfg: PluginCfg) -> None: super().__init__() if not self.fqn: self.fqn = self.__class__.__mro__[0].__module__ - for attr in ["id", "default_on"]: + # names from the configuration + for n, v in plg_cfg.__dict__.items(): + setattr(self, n, v) + + # names that must be set by the plugin implementation + for attr in [ + "id", + ]: if getattr(self, attr, None) is None: raise NotImplementedError(f"plugin {self} is missing attribute {attr}") - if not self.id: - self.id = f"{self.__class__.__module__}.{self.__class__.__name__}" + if not ID_REGXP.match(self.id): + raise ValueError(f"plugin ID {self.id} contains invalid character (use lowercase ASCII)") + if not getattr(self, "log", None): - self.log = log.getChild(self.id) + pkg_name = inspect.getmodule(self.__class__).__package__ # type: ignore + self.log = logging.getLogger(f"{pkg_name}.{self.id}") def __hash__(self) -> int: """The hash value is used in :py:obj:`set`, for example, when an object @@ -121,7 +126,7 @@ class Plugin(abc.ABC): return hash(self) == hash(other) - def init(self, app: flask.Flask) -> bool: # pylint: disable=unused-argument + def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument """Initialization of the plugin, the return value decides whether this plugin is active or not. Initialization only takes place once, at the time the WEB application is set up. The base methode always returns @@ -151,7 +156,8 @@ class Plugin(abc.ABC): .. hint:: - If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must + If :py:obj:`Result.url <searx.result_types._base.Result.url>` is modified, + :py:obj:`Result.parsed_url <searx.result_types._base.Result.parsed_url>` must be changed accordingly: .. code:: python @@ -161,81 +167,24 @@ class Plugin(abc.ABC): return True def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]: - """Runs AFTER the search request. Can return a list of :py:obj:`Result` - objects to be added to the final result list.""" + """Runs AFTER the search request. Can return a list of + :py:obj:`Result <searx.result_types._base.Result>` objects to be added to the + final result list.""" return -class ModulePlugin(Plugin): - """A wrapper class for legacy *plugins*. - - .. note:: - - For internal use only! +@dataclass +class PluginCfg: + """Settings of a plugin. - In a module plugin, the follwing names are mapped: + .. code:: yaml - - `module.query_keywords` --> :py:obj:`Plugin.keywords` - - `module.plugin_id` --> :py:obj:`Plugin.id` - - `module.logger` --> :py:obj:`Plugin.log` + mypackage.mymodule.MyPlugin: + active: true """ - _required_attrs = (("name", str), ("description", str), ("default_on", bool)) - - def __init__(self, mod: types.ModuleType, fqn: str): - """In case of missing attributes in the module or wrong types are given, - a :py:obj:`TypeError` exception is raised.""" - - self.fqn = fqn - self.module = mod - self.id = getattr(self.module, "plugin_id", self.module.__name__) - self.log = logging.getLogger(self.module.__name__) - self.keywords = getattr(self.module, "query_keywords", []) - - for attr, attr_type in self._required_attrs: - if not hasattr(self.module, attr): - msg = f"missing attribute {attr}, cannot load plugin" - self.log.critical(msg) - raise TypeError(msg) - if not isinstance(getattr(self.module, attr), attr_type): - msg = f"attribute {attr} is not of type {attr_type}" - self.log.critical(msg) - raise TypeError(msg) - - self.default_on = mod.default_on - self.info = PluginInfo( - id=self.id, - name=self.module.name, - description=self.module.description, - preference_section=getattr(self.module, "preference_section", None), - examples=getattr(self.module, "query_examples", []), - keywords=self.keywords, - ) - - # monkeypatch module - self.module.logger = self.log # type: ignore - - super().__init__() - - def init(self, app: flask.Flask) -> bool: - if not hasattr(self.module, "init"): - return True - return self.module.init(app) - - def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: - if not hasattr(self.module, "pre_search"): - return True - return self.module.pre_search(request, search) - - def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: - if not hasattr(self.module, "on_result"): - return True - return self.module.on_result(request, search, result) - - def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]: - if not hasattr(self.module, "post_search"): - return None - return self.module.post_search(request, search) + active: bool = False + """Plugin is active by default and the user can *opt-out* in the preferences.""" class PluginStorage: @@ -244,22 +193,10 @@ class PluginStorage: plugin_list: set[Plugin] """The list of :py:obj:`Plugins` in this storage.""" - legacy_plugins = [ - "ahmia_filter", - "calculator", - "hostnames", - "oa_doi_rewrite", - "tor_check", - "tracker_url_remover", - "unit_converter", - ] - """Internal plugins implemented in the legacy style (as module / deprecated!).""" - def __init__(self): self.plugin_list = set() def __iter__(self): - yield from self.plugin_list def __len__(self): @@ -267,102 +204,42 @@ class PluginStorage: @property def info(self) -> list[PluginInfo]: - return [p.info for p in self.plugin_list] - def load_builtins(self): - """Load plugin modules from: - - - the python packages in :origin:`searx/plugins` and - - the external plugins from :ref:`settings plugins`. - """ - - for f in _default.iterdir(): - - if f.name.startswith("_"): - continue + return [p.info for p in self.plugin_list] - if f.stem not in self.legacy_plugins: - self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin") - continue + def load_settings(self, cfg: dict[str, dict]): + """Load plugins configured in SearXNG's settings :ref:`settings + plugins`.""" - # for backward compatibility - mod = load_module(f.name, str(f.parent)) - self.register(ModulePlugin(mod, f"searx.plugins.{f.stem}")) + for fqn, plg_settings in cfg.items(): + cls = None + mod_name, cls_name = fqn.rsplit('.', 1) + try: + mod = importlib.import_module(mod_name) + cls = getattr(mod, cls_name, None) + except Exception as exc: # pylint: disable=broad-exception-caught + log.exception(exc) - for fqn in searx.get_setting("plugins"): # type: ignore - self.register_by_fqn(fqn) + if cls is None: + msg = f"plugin {fqn} is not implemented" + raise ValueError(msg) + plg = cls(PluginCfg(**plg_settings)) + self.register(plg) def register(self, plugin: Plugin): """Register a :py:obj:`Plugin`. In case of name collision (if two plugins have same ID) a :py:obj:`KeyError` exception is raised. """ - if plugin in self.plugin_list: + if plugin in [p.id for p in self.plugin_list]: msg = f"name collision '{plugin.id}'" plugin.log.critical(msg) raise KeyError(msg) - if not plugin.fqn.startswith("searx.plugins."): - self.plugin_list.add(plugin) - plugin.log.debug("plugin has been registered") - return - - # backward compatibility for the enabled_plugins setting - # https://docs.searxng.org/admin/settings/settings_plugins.html#enabled-plugins-internal - en_plgs: list[str] | None = searx.get_setting("enabled_plugins") # type:ignore - - if en_plgs is None: - # enabled_plugins not listed in the /etc/searxng/settings.yml: - # check default_on before register .. - if plugin.default_on: - self.plugin_list.add(plugin) - plugin.log.debug("builtin plugin has been registered by SearXNG's defaults") - return - plugin.log.debug("builtin plugin is not registered by SearXNG's defaults") - return - - if plugin.info.name not in en_plgs: - # enabled_plugins listed in the /etc/searxng/settings.yml, - # but this plugin is not listed in: - plugin.log.debug("builtin plugin is not registered by maintainer's settings") - return - - # if the plugin is in enabled_plugins, then it is on by default. - plugin.default_on = True self.plugin_list.add(plugin) - plugin.log.debug("builtin plugin is registered by maintainer's settings") - - def register_by_fqn(self, fqn: str): - """Register a :py:obj:`Plugin` via its fully qualified class name (FQN). - The FQNs of external plugins could be read from a configuration, for - example, and registered using this method - """ - - mod_name, _, obj_name = fqn.rpartition('.') - if not mod_name: - # for backward compatibility - code_obj = importlib.import_module(fqn) - else: - mod = importlib.import_module(mod_name) - code_obj = getattr(mod, obj_name, None) - - if code_obj is None: - msg = f"plugin {fqn} is not implemented" - log.critical(msg) - raise ValueError(msg) - - if isinstance(code_obj, types.ModuleType): - # for backward compatibility - warnings.warn( - f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning - ) - - self.register(ModulePlugin(code_obj, fqn)) - return - - self.register(code_obj()) + plugin.log.debug("plugin has been loaded") - def init(self, app: flask.Flask) -> None: + def init(self, app: "flask.Flask") -> None: """Calls the method :py:obj:`Plugin.init` of each plugin in this storage. Depending on its return value, the plugin is removed from *this* storage or not.""" diff --git a/searx/plugins/ahmia_filter.py b/searx/plugins/ahmia_filter.py index 3a6d48eed..a5f6a39ab 100644 --- a/searx/plugins/ahmia_filter.py +++ b/searx/plugins/ahmia_filter.py @@ -1,34 +1,51 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring - from __future__ import annotations +import typing from hashlib import md5 -import flask +from flask_babel import gettext from searx.data import ahmia_blacklist_loader from searx import get_setting +from searx.plugins import Plugin, PluginInfo - -name = "Ahmia blacklist" -description = "Filter out onion results that appear in Ahmia's blacklist. (See https://ahmia.fi/blacklist)" -default_on = True -preference_section = 'onions' +if typing.TYPE_CHECKING: + import flask + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.result_types import Result + from searx.plugins import PluginCfg ahmia_blacklist: list = [] -def on_result(_request, _search, result) -> bool: - if not getattr(result, 'is_onion', None) or not getattr(result, 'parsed_url', None): +class SXNGPlugin(Plugin): + """Filter out onion results that appear in Ahmia's blacklist (See https://ahmia.fi/blacklist).""" + + id = "ahmia_filter" + + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext("Ahmia blacklist"), + description=gettext("Filter out onion results that appear in Ahmia's blacklist."), + preference_section="general", + ) + + def on_result( + self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result + ) -> bool: # pylint: disable=unused-argument + if not getattr(result, "is_onion", False) or not getattr(result, "parsed_url", False): + return True + result_hash = md5(result["parsed_url"].hostname.encode()).hexdigest() + return result_hash not in ahmia_blacklist + + def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument + global ahmia_blacklist # pylint: disable=global-statement + if not get_setting("outgoing.using_tor_proxy"): + # disable the plugin + return False + ahmia_blacklist = ahmia_blacklist_loader() return True - result_hash = md5(result['parsed_url'].hostname.encode()).hexdigest() - return result_hash not in ahmia_blacklist - - -def init(app=flask.Flask) -> bool: # pylint: disable=unused-argument - global ahmia_blacklist # pylint: disable=global-statement - if not get_setting("outgoing.using_tor_proxy"): - # disable the plugin - return False - ahmia_blacklist = ahmia_blacklist_loader() - return True diff --git a/searx/plugins/calculator.py b/searx/plugins/calculator.py index 11caa272f..0b6a0838e 100644 --- a/searx/plugins/calculator.py +++ b/searx/plugins/calculator.py @@ -1,9 +1,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -"""Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval"). +"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval"). """ from __future__ import annotations -from typing import Callable +import typing import ast import re @@ -15,14 +15,78 @@ import babel.numbers from flask_babel import gettext from searx.result_types import EngineResults +from searx.plugins import Plugin, PluginInfo -name = "Basic Calculator" -description = gettext("Calculate mathematical expressions via the search bar") -default_on = True -preference_section = 'general' -plugin_id = 'calculator' +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.plugins import PluginCfg -operators: dict[type, Callable] = { + +class SXNGPlugin(Plugin): + """Plugin converts strings to different hash digests. The results are + displayed in area for the "answers". + """ + + id = "calculator" + + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + + self.info = PluginInfo( + id=self.id, + name=gettext("Basic Calculator"), + description=gettext("Calculate mathematical expressions via the search bar"), + preference_section="general", + ) + + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults: + results = EngineResults() + + # only show the result of the expression on the first page + if search.search_query.pageno > 1: + return results + + query = search.search_query.query + # in order to avoid DoS attacks with long expressions, ignore long expressions + if len(query) > 100: + return results + + # replace commonly used math operators with their proper Python operator + query = query.replace("x", "*").replace(":", "/") + + # use UI language + ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-") + + # parse the number system in a localized way + def _decimal(match: re.Match) -> str: + val = match.string[match.start() : match.end()] + val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn") + return str(val) + + decimal = ui_locale.number_symbols["latn"]["decimal"] + group = ui_locale.number_symbols["latn"]["group"] + query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query) + + # only numbers and math operators are accepted + if any(str.isalpha(c) for c in query): + return results + + # in python, powers are calculated via ** + query_py_formatted = query.replace("^", "**") + + # Prevent the runtime from being longer than 50 ms + res = timeout_func(0.05, _eval_expr, query_py_formatted) + if res is None or res == "": + return results + + res = babel.numbers.format_decimal(res, locale=ui_locale) + results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}")) + + return results + + +operators: dict[type, typing.Callable] = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, @@ -92,49 +156,3 @@ def timeout_func(timeout, func, *args, **kwargs): p.join() p.close() return ret_val - - -def post_search(request, search) -> EngineResults: - results = EngineResults() - - # only show the result of the expression on the first page - if search.search_query.pageno > 1: - return results - - query = search.search_query.query - # in order to avoid DoS attacks with long expressions, ignore long expressions - if len(query) > 100: - return results - - # replace commonly used math operators with their proper Python operator - query = query.replace("x", "*").replace(":", "/") - - # use UI language - ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-') - - # parse the number system in a localized way - def _decimal(match: re.Match) -> str: - val = match.string[match.start() : match.end()] - val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn") - return str(val) - - decimal = ui_locale.number_symbols["latn"]["decimal"] - group = ui_locale.number_symbols["latn"]["group"] - query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query) - - # only numbers and math operators are accepted - if any(str.isalpha(c) for c in query): - return results - - # in python, powers are calculated via ** - query_py_formatted = query.replace("^", "**") - - # Prevent the runtime from being longer than 50 ms - res = timeout_func(0.05, _eval_expr, query_py_formatted) - if res is None or res == "": - return results - - res = babel.numbers.format_decimal(res, locale=ui_locale) - results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}")) - - return results diff --git a/searx/plugins/hash_plugin.py b/searx/plugins/hash_plugin.py index 940c895a1..ab0ac010b 100644 --- a/searx/plugins/hash_plugin.py +++ b/searx/plugins/hash_plugin.py @@ -14,6 +14,7 @@ from searx.result_types import EngineResults if typing.TYPE_CHECKING: from searx.search import SearchWithPlugins from searx.extended_types import SXNG_Request + from searx.plugins import PluginCfg class SXNGPlugin(Plugin): @@ -22,11 +23,10 @@ class SXNGPlugin(Plugin): """ id = "hash_plugin" - default_on = True keywords = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"] - def __init__(self): - super().__init__() + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) self.parser_re = re.compile(f"({'|'.join(self.keywords)}) (.*)", re.I) self.info = PluginInfo( diff --git a/searx/plugins/hostnames.py b/searx/plugins/hostnames.py index 5f88bcd40..53db5507a 100644 --- a/searx/plugins/hostnames.py +++ b/searx/plugins/hostnames.py @@ -1,19 +1,10 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -# pylint: disable=too-many-branches +# pylint: disable=too-many-branches, unused-argument """ -.. attention:: - The **"Hostname replace"** plugin has been replace by **"Hostnames - plugin"**, see :pull:`3463` & :pull:`3552`. - -The **Hostnames plugin** can be enabled by adding it to the -``enabled_plugins`` **list** in the ``setting.yml`` like so. - - .. code:: yaml - - enabled_plugins: - - 'Hostnames plugin' - ... +During the initialization phase, the plugin checks whether a ``hostnames:`` +configuration exists. If this is not the case, the plugin is not included +in the PluginStorage (it is not available for selection). - ``hostnames.replace``: A **mapping** of regular expressions to hostnames to be replaced by other hostnames. @@ -92,6 +83,7 @@ something like this: """ from __future__ import annotations +import typing import re from urllib.parse import urlunparse, urlparse @@ -99,84 +91,114 @@ from urllib.parse import urlunparse, urlparse from flask_babel import gettext from searx import settings +from searx.result_types._base import MainResult, LegacyResult from searx.settings_loader import get_yaml_cfg +from searx.plugins import Plugin, PluginInfo +from ._core import log -name = gettext('Hostnames plugin') -description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname') -default_on = False -preference_section = 'general' +if typing.TYPE_CHECKING: + import flask + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.result_types import Result + from searx.plugins import PluginCfg -plugin_id = 'hostnames' -parsed = 'parsed_url' -_url_fields = ['iframe_src', 'audio_src'] +REPLACE: dict[re.Pattern, str] = {} +REMOVE: set = set() +HIGH: set = set() +LOW: set = set() -def _load_regular_expressions(settings_key) -> dict | set | None: - setting_value = settings.get(plugin_id, {}).get(settings_key) +class SXNGPlugin(Plugin): + """Rewrite hostnames, remove results or prioritize them.""" - if not setting_value: - return None + id = "hostnames" + + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext("Hostnames plugin"), + description=gettext("Rewrite hostnames, remove results or prioritize them based on the hostname"), + preference_section="general", + ) - # load external file with configuration - if isinstance(setting_value, str): - setting_value = get_yaml_cfg(setting_value) + def on_result(self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result) -> bool: - if isinstance(setting_value, list): - return {re.compile(r) for r in setting_value} + for pattern in REMOVE: + if result.parsed_url and pattern.search(result.parsed_url.netloc): + # if the link (parsed_url) of the result match, then remove the + # result from the result list, in any other case, the result + # remains in the list / see final "return True" below. + # log.debug("FIXME: remove [url/parsed_url] %s %s", pattern.pattern, result.url) + return False - if isinstance(setting_value, dict): - return {re.compile(p): r for (p, r) in setting_value.items()} + result.filter_urls(filter_url_field) - return None + if isinstance(result, (MainResult, LegacyResult)): + for pattern in LOW: + if result.parsed_url and pattern.search(result.parsed_url.netloc): + result.priority = "low" + for pattern in HIGH: + if result.parsed_url and pattern.search(result.parsed_url.netloc): + result.priority = "high" -replacements: dict = _load_regular_expressions('replace') or {} # type: ignore -removables: set = _load_regular_expressions('remove') or set() # type: ignore -high_priority: set = _load_regular_expressions('high_priority') or set() # type: ignore -low_priority: set = _load_regular_expressions('low_priority') or set() # type: ignore + return True + def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument + global REPLACE, REMOVE, HIGH, LOW # pylint: disable=global-statement -def _matches_parsed_url(result, pattern): - return result[parsed] and (parsed in result and pattern.search(result[parsed].netloc)) + if not settings.get(self.id): + # Remove plugin, if there isn't a "hostnames:" setting + return False + REPLACE = self._load_regular_expressions("replace") or {} # type: ignore + REMOVE = self._load_regular_expressions("remove") or set() # type: ignore + HIGH = self._load_regular_expressions("high_priority") or set() # type: ignore + LOW = self._load_regular_expressions("low_priority") or set() # type: ignore -def on_result(_request, _search, result) -> bool: - for pattern, replacement in replacements.items(): - if _matches_parsed_url(result, pattern): - # logger.debug(result['url']) - result[parsed] = result[parsed]._replace(netloc=pattern.sub(replacement, result[parsed].netloc)) - result['url'] = urlunparse(result[parsed]) - # logger.debug(result['url']) + return True - for url_field in _url_fields: - if not getattr(result, url_field, None): - continue + def _load_regular_expressions(self, settings_key) -> dict[re.Pattern, str] | set | None: + setting_value = settings.get(self.id, {}).get(settings_key) - url_src = urlparse(result[url_field]) - if pattern.search(url_src.netloc): - url_src = url_src._replace(netloc=pattern.sub(replacement, url_src.netloc)) - result[url_field] = urlunparse(url_src) + if not setting_value: + return None - for pattern in removables: - if _matches_parsed_url(result, pattern): - return False + # load external file with configuration + if isinstance(setting_value, str): + setting_value = get_yaml_cfg(setting_value) - for url_field in _url_fields: - if not getattr(result, url_field, None): - continue + if isinstance(setting_value, list): + return {re.compile(r) for r in setting_value} - url_src = urlparse(result[url_field]) - if pattern.search(url_src.netloc): - del result[url_field] + if isinstance(setting_value, dict): + return {re.compile(p): r for (p, r) in setting_value.items()} - for pattern in low_priority: - if _matches_parsed_url(result, pattern): - result['priority'] = 'low' + return None + + +def filter_url_field(result: "Result|LegacyResult", field_name: str, url_src: str) -> bool | str: + """Returns bool ``True`` to use URL unchanged (``False`` to ignore URL). + If URL should be modified, the returned string is the new URL to use.""" + + if not url_src: + log.debug("missing a URL in field %s", field_name) + return True + + url_src_parsed = urlparse(url=url_src) + + for pattern in REMOVE: + if pattern.search(url_src_parsed.netloc): + return False - for pattern in high_priority: - if _matches_parsed_url(result, pattern): - result['priority'] = 'high' + for pattern, replacement in REPLACE.items(): + if pattern.search(url_src_parsed.netloc): + new_url = url_src_parsed._replace(netloc=pattern.sub(replacement, url_src_parsed.netloc)) + new_url = urlunparse(new_url) + return new_url return True diff --git a/searx/plugins/oa_doi_rewrite.py b/searx/plugins/oa_doi_rewrite.py index be5a8d4a4..dac60d298 100644 --- a/searx/plugins/oa_doi_rewrite.py +++ b/searx/plugins/oa_doi_rewrite.py @@ -1,54 +1,90 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=missing-module-docstring - from __future__ import annotations +import typing + import re -from urllib.parse import urlparse, parse_qsl +from urllib.parse import parse_qsl from flask_babel import gettext +from searx import get_setting +from searx.plugins import Plugin, PluginInfo +from searx.extended_types import sxng_request -from searx import settings +from ._core import log +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.result_types import Result, LegacyResult + from searx.plugins import PluginCfg -regex = re.compile(r'10\.\d{4,9}/[^\s]+') -name = gettext('Open Access DOI rewrite') -description = gettext('Avoid paywalls by redirecting to open-access versions of publications when available') -default_on = False -preference_section = 'general/doi_resolver' +ahmia_blacklist: list = [] -def extract_doi(url): - match = regex.search(url.path) - if match: - return match.group(0) - for _, v in parse_qsl(url.query): - match = regex.search(v) - if match: - return match.group(0) - return None +def filter_url_field(result: "Result|LegacyResult", field_name: str, url_src: str) -> bool | str: + """Returns bool ``True`` to use URL unchanged (``False`` to ignore URL). + If URL should be modified, the returned string is the new URL to use.""" + if field_name != "url": + return True # use it unchanged -def get_doi_resolver(preferences): - doi_resolvers = settings['doi_resolvers'] - selected_resolver = preferences.get_value('doi_resolver')[0] - if selected_resolver not in doi_resolvers: - selected_resolver = settings['default_doi_resolver'] - return doi_resolvers[selected_resolver] + doi = extract_doi(result.parsed_url) + if doi and len(doi) < 50: + for suffix in ("/", ".pdf", ".xml", "/full", "/meta", "/abstract"): + doi = doi.removesuffix(suffix) + new_url = get_doi_resolver() + doi + if "doi" not in result: + result["doi"] = doi + log.debug("oa_doi_rewrite: [URL field: %s] %s -> %s", field_name, url_src, new_url) + return new_url # use new url + + return True # use it unchanged + + +class SXNGPlugin(Plugin): + """Avoid paywalls by redirecting to open-access.""" + id = "oa_doi_rewrite" -def on_result(request, _search, result) -> bool: + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext("Open Access DOI rewrite"), + description=gettext("Avoid paywalls by redirecting to open-access versions of publications when available"), + preference_section="general", + ) - if not result.parsed_url: + def on_result( + self, + request: "SXNG_Request", + search: "SearchWithPlugins", + result: "Result", + ) -> bool: # pylint: disable=unused-argument + if result.parsed_url: + result.filter_urls(filter_url_field) return True - doi = extract_doi(result['parsed_url']) - if doi and len(doi) < 50: - for suffix in ('/', '.pdf', '.xml', '/full', '/meta', '/abstract'): - if doi.endswith(suffix): - doi = doi[: -len(suffix)] - result['url'] = get_doi_resolver(request.preferences) + doi - result['parsed_url'] = urlparse(result['url']) - if 'doi' not in result: - result['doi'] = doi - return True + +regex = re.compile(r'10\.\d{4,9}/[^\s]+') + + +def extract_doi(url): + m = regex.search(url.path) + if m: + return m.group(0) + for _, v in parse_qsl(url.query): + m = regex.search(v) + if m: + return m.group(0) + return None + + +def get_doi_resolver() -> str: + doi_resolvers = get_setting("doi_resolvers") + selected_resolver = sxng_request.preferences.get_value('doi_resolver')[0] + if selected_resolver not in doi_resolvers: + selected_resolver = get_setting("default_doi_resolver") + return doi_resolvers[selected_resolver] diff --git a/searx/plugins/self_info.py b/searx/plugins/self_info.py index f5498e480..ef035e683 100644 --- a/searx/plugins/self_info.py +++ b/searx/plugins/self_info.py @@ -14,6 +14,7 @@ from . import Plugin, PluginInfo if typing.TYPE_CHECKING: from searx.search import SearchWithPlugins from searx.extended_types import SXNG_Request + from . import PluginCfg class SXNGPlugin(Plugin): @@ -23,11 +24,10 @@ class SXNGPlugin(Plugin): """ id = "self_info" - default_on = True keywords = ["ip", "user-agent"] - def __init__(self): - super().__init__() + def __init__(self, plg_cfg: "PluginCfg"): + super().__init__(plg_cfg) self.ip_regex = re.compile(r"^ip", re.IGNORECASE) self.ua_regex = re.compile(r"^user-agent", re.IGNORECASE) diff --git a/searx/plugins/tor_check.py b/searx/plugins/tor_check.py index e719207bf..6a24714c3 100644 --- a/searx/plugins/tor_check.py +++ b/searx/plugins/tor_check.py @@ -3,47 +3,24 @@ user searches for ``tor-check``. It fetches the tor exit node list from :py:obj:`url_exit_list` and parses all the IPs into a list, then checks if the user's IP address is in it. - -Enable in ``settings.yml``: - -.. code:: yaml - - enabled_plugins: - .. - - 'Tor check plugin' - """ - from __future__ import annotations +import typing import re from flask_babel import gettext from httpx import HTTPError from searx.network import get +from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults from searx.botdetection import get_real_ip +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.plugins import PluginCfg -default_on = False - -name = gettext("Tor check plugin") -'''Translated name of the plugin''' - -description = gettext( - "This plugin checks if the address of the request is a Tor exit-node, and" - " informs the user if it is; like check.torproject.org, but from SearXNG." -) -'''Translated description of the plugin.''' - -preference_section = 'query' -'''The preference section where the plugin is shown.''' - -query_keywords = ['tor-check'] -'''Query keywords shown in the preferences.''' - -query_examples = '' -'''Query examples shown in the preferences.''' # Regex for exit node addresses in the list. reg = re.compile(r"(?<=ExitAddress )\S+") @@ -52,33 +29,51 @@ url_exit_list = "https://check.torproject.org/exit-addresses" """URL to load Tor exit list from.""" -def post_search(request, search) -> EngineResults: - results = EngineResults() +class SXNGPlugin(Plugin): + """Rewrite hostnames, remove results or prioritize them.""" - if search.search_query.pageno > 1: - return results + id = "tor_check" + keywords = ["tor-check"] - if search.search_query.query.lower() == "tor-check": + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext("Tor check plugin"), + description=gettext( + "This plugin checks if the address of the request is a Tor exit-node, and" + " informs the user if it is; like check.torproject.org, but from SearXNG." + ), + preference_section="query", + ) - # Request the list of tor exit nodes. - try: - resp = get(url_exit_list) - node_list = re.findall(reg, resp.text) # type: ignore + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults: + results = EngineResults() - except HTTPError: - # No answer, return error - msg = gettext("Could not download the list of Tor exit-nodes from") - results.add(results.types.Answer(answer=f"{msg} {url_exit_list}")) + if search.search_query.pageno > 1: return results - real_ip = get_real_ip(request) + if search.search_query.query.lower() == "tor-check": + + # Request the list of tor exit nodes. + try: + resp = get(url_exit_list) + node_list = re.findall(reg, resp.text) # type: ignore - if real_ip in node_list: - msg = gettext("You are using Tor and it looks like you have the external IP address") - results.add(results.types.Answer(answer=f"{msg} {real_ip}")) + except HTTPError: + # No answer, return error + msg = gettext("Could not download the list of Tor exit-nodes from") + results.add(results.types.Answer(answer=f"{msg} {url_exit_list}")) + return results - else: - msg = gettext("You are not using Tor and you have the external IP address") - results.add(results.types.Answer(answer=f"{msg} {real_ip}")) + real_ip = get_real_ip(request) - return results + if real_ip in node_list: + msg = gettext("You are using Tor and it looks like you have the external IP address") + results.add(results.types.Answer(answer=f"{msg} {real_ip}")) + + else: + msg = gettext("You are not using Tor and you have the external IP address") + results.add(results.types.Answer(answer=f"{msg} {real_ip}")) + + return results diff --git a/searx/plugins/tracker_url_remover.py b/searx/plugins/tracker_url_remover.py index f33f7fdfd..d9c767a36 100644 --- a/searx/plugins/tracker_url_remover.py +++ b/searx/plugins/tracker_url_remover.py @@ -2,12 +2,21 @@ # pylint: disable=missing-module-docstring from __future__ import annotations +import typing import re from urllib.parse import urlunparse, parse_qsl, urlencode from flask_babel import gettext +from searx.plugins import Plugin, PluginInfo + +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.result_types import Result + from searx.plugins import PluginCfg + regexes = { re.compile(r'utm_[^&]+'), re.compile(r'(wkey|wemail)[^&]*'), @@ -15,30 +24,35 @@ regexes = { re.compile(r'&$'), } -name = gettext('Tracker URL remover') -description = gettext('Remove trackers arguments from the returned URL') -default_on = True -preference_section = 'privacy' - -def on_result(_request, _search, result) -> bool: +class SXNGPlugin(Plugin): + """Remove trackers arguments from the returned URL""" + + id = "tracker_url_remover" + + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext("Tracker URL remover"), + description=gettext("Remove trackers arguments from the returned URL"), + preference_section="privacy", + ) + + def on_result( + self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result + ) -> bool: # pylint: disable=unused-argument + if not result.parsed_url: + return True + + parsed_query: list[tuple[str, str]] = parse_qsl(result.parsed_url.query) + for name_value in list(parsed_query): + param_name = name_value[0] + for reg in regexes: + if reg.match(param_name): + parsed_query.remove(name_value) + result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query)) + result.url = urlunparse(result.parsed_url) + break - parsed_url = getattr(result, "parsed_url", None) - if not parsed_url: return True - - if parsed_url.query == "": - return True - - parsed_query = parse_qsl(parsed_url.query) - changes = 0 - for i, (param_name, _) in enumerate(list(parsed_query)): - for reg in regexes: - if reg.match(param_name): - parsed_query.pop(i - changes) - changes += 1 - result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query)) - result.url = urlunparse(result.parsed_url) - break - - return True diff --git a/searx/plugins/unit_converter.py b/searx/plugins/unit_converter.py index 3b9f98945..2bab598f2 100644 --- a/searx/plugins/unit_converter.py +++ b/searx/plugins/unit_converter.py @@ -7,36 +7,74 @@ converters, each converter is one item in the list (compare :py:obj:`ADDITIONAL_UNITS`). If the symbols are ambiguous, the matching units of measurement are evaluated. The weighting in the evaluation results from the sorting of the :py:obj:`list of unit converters<symbol_to_si>`. - -Enable in ``settings.yml``: - -.. code:: yaml - - enabled_plugins: - .. - - 'Unit converter plugin' - """ - from __future__ import annotations +import typing import re import babel.numbers from flask_babel import gettext, get_locale from searx import data +from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from searx.plugins import PluginCfg + -name = "Unit converter plugin" -description = gettext("Convert between units") -default_on = True +name = "" +description = gettext("") -plugin_id = "unit_converter" -preference_section = "general" +plugin_id = "" +preference_section = "" CONVERT_KEYWORDS = ["in", "to", "as"] + +class SXNGPlugin(Plugin): + """Convert between units. The result is displayed in area for the + "answers". + """ + + id = "unit_converter" + + def __init__(self, plg_cfg: "PluginCfg") -> None: + super().__init__(plg_cfg) + + self.info = PluginInfo( + id=self.id, + name=gettext("Unit converter plugin"), + description=gettext("Convert between units"), + preference_section="general", + ) + + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults: + results = EngineResults() + + # only convert between units on the first page + if search.search_query.pageno > 1: + return results + + query = search.search_query.query + query_parts = query.split(" ") + + if len(query_parts) < 3: + return results + + for query_part in query_parts: + for keyword in CONVERT_KEYWORDS: + if query_part == keyword: + from_query, to_query = query.split(keyword, 1) + target_val = _parse_text_and_convert(from_query.strip(), to_query.strip()) + if target_val: + results.add(results.types.Answer(answer=target_val)) + + return results + + # inspired from https://stackoverflow.com/a/42475086 RE_MEASURE = r''' (?P<sign>[-+]?) # +/- or nothing for positive @@ -243,27 +281,3 @@ def _parse_text_and_convert(from_query, to_query) -> str | None: result = babel.numbers.format_decimal(value, locale=_locale, format='#,##0.##########;-#') return f'{result} {target_symbol}' - - -def post_search(_request, search) -> EngineResults: - results = EngineResults() - - # only convert between units on the first page - if search.search_query.pageno > 1: - return results - - query = search.search_query.query - query_parts = query.split(" ") - - if len(query_parts) < 3: - return results - - for query_part in query_parts: - for keyword in CONVERT_KEYWORDS: - if query_part == keyword: - from_query, to_query = query.split(keyword, 1) - target_val = _parse_text_and_convert(from_query.strip(), to_query.strip()) - if target_val: - results.add(results.types.Answer(answer=target_val)) - - return results |