summaryrefslogtreecommitdiff
path: root/searx/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'searx/plugins')
-rw-r--r--searx/plugins/__init__.py292
-rw-r--r--searx/plugins/_core.py394
-rw-r--r--searx/plugins/ahmia_filter.py14
-rw-r--r--searx/plugins/calculator.py70
-rw-r--r--searx/plugins/hash_plugin.py97
-rw-r--r--searx/plugins/hostnames.py21
-rw-r--r--searx/plugins/oa_doi_rewrite.py10
-rw-r--r--searx/plugins/self_info.py75
-rw-r--r--searx/plugins/tor_check.py53
-rw-r--r--searx/plugins/tracker_url_remover.py18
-rw-r--r--searx/plugins/unit_converter.py30
11 files changed, 689 insertions, 385 deletions
diff --git a/searx/plugins/__init__.py b/searx/plugins/__init__.py
index c3aad5f32..9aaf2f2db 100644
--- a/searx/plugins/__init__.py
+++ b/searx/plugins/__init__.py
@@ -1,232 +1,68 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
-# pylint: disable=missing-module-docstring, missing-class-docstring
-
-import sys
-from hashlib import sha256
-from importlib import import_module
-from os import listdir, makedirs, remove, stat, utime
-from os.path import abspath, basename, dirname, exists, join
-from shutil import copyfile
-from pkgutil import iter_modules
-from logging import getLogger
-from typing import List, Tuple
-
-from searx import logger, settings
-
-
-class Plugin: # pylint: disable=too-few-public-methods
- """This class is currently never initialized and only used for type hinting."""
-
- id: str
- name: str
- description: str
- default_on: bool
- js_dependencies: Tuple[str]
- css_dependencies: Tuple[str]
- preference_section: str
-
-
-logger = logger.getChild("plugins")
-
-required_attrs = (
- # fmt: off
- ("name", str),
- ("description", str),
- ("default_on", bool)
- # fmt: on
-)
-
-optional_attrs = (
- # fmt: off
- ("js_dependencies", tuple),
- ("css_dependencies", tuple),
- ("preference_section", str),
- # fmt: on
-)
-
-
-def sha_sum(filename):
- with open(filename, "rb") as f:
- file_content_bytes = f.read()
- return sha256(file_content_bytes).hexdigest()
-
-
-def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
- dep_path = join(base_path, resource_path)
- file_name = basename(dep_path)
- resource_path = join(target_dir, file_name)
- if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
- try:
- copyfile(dep_path, resource_path)
- # copy atime_ns and mtime_ns, so the weak ETags (generated by
- # the HTTP server) do not change
- dep_stat = stat(dep_path)
- utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
- except IOError:
- logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name))
- sys.exit(3)
-
- # returning with the web path of the resource
- return join("plugins/external_plugins", plugin_dir, file_name)
-
-
-def prepare_package_resources(plugin, plugin_module_name):
- plugin_base_path = dirname(abspath(plugin.__file__))
-
- plugin_dir = plugin_module_name
- target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir)
- try:
- makedirs(target_dir, exist_ok=True)
- except IOError:
- logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name))
- sys.exit(3)
-
- resources = []
-
- if hasattr(plugin, "js_dependencies"):
- resources.extend(map(basename, plugin.js_dependencies))
- plugin.js_dependencies = [
- sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
- for x in plugin.js_dependencies
- ]
-
- if hasattr(plugin, "css_dependencies"):
- resources.extend(map(basename, plugin.css_dependencies))
- plugin.css_dependencies = [
- sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
- for x in plugin.css_dependencies
- ]
-
- for f in listdir(target_dir):
- if basename(f) not in resources:
- resource_path = join(target_dir, basename(f))
- try:
- remove(resource_path)
- except IOError:
- logger.critical(
- "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name)
- )
- sys.exit(3)
-
-
-def load_plugin(plugin_module_name, external):
- # pylint: disable=too-many-branches
- try:
- plugin = import_module(plugin_module_name)
- except (
- SyntaxError,
- KeyboardInterrupt,
- SystemExit,
- SystemError,
- ImportError,
- RuntimeError,
- ) as e:
- logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
- sys.exit(3)
- except BaseException:
- logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
- return None
-
- # difference with searx: use module name instead of the user name
- plugin.id = plugin_module_name
-
- #
- plugin.logger = getLogger(plugin_module_name)
-
- for plugin_attr, plugin_attr_type in required_attrs:
- if not hasattr(plugin, plugin_attr):
- logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr)
- sys.exit(3)
- attr = getattr(plugin, plugin_attr)
- if not isinstance(attr, plugin_attr_type):
- type_attr = str(type(attr))
- logger.critical(
- '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
- plugin, plugin_attr, type_attr, plugin_attr_type
- )
- )
- sys.exit(3)
-
- for plugin_attr, plugin_attr_type in optional_attrs:
- if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type):
- setattr(plugin, plugin_attr, plugin_attr_type())
-
- if not hasattr(plugin, "preference_section"):
- plugin.preference_section = "general"
-
- # query plugin
- if plugin.preference_section == "query":
- for plugin_attr in ("query_keywords", "query_examples"):
- if not hasattr(plugin, plugin_attr):
- logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin))
- sys.exit(3)
-
- if settings.get("enabled_plugins"):
- # searx compatibility: plugin.name in settings['enabled_plugins']
- plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"]
-
- # copy resources if this is an external plugin
- if external:
- prepare_package_resources(plugin, plugin_module_name)
-
- logger.debug("%s: loaded", plugin_module_name)
-
- return plugin
-
-
-def load_and_initialize_plugin(plugin_module_name, external, init_args):
- plugin = load_plugin(plugin_module_name, external)
- if plugin and hasattr(plugin, 'init'):
- try:
- return plugin if plugin.init(*init_args) else None
- except Exception: # pylint: disable=broad-except
- plugin.logger.exception("Exception while calling init, the plugin is disabled")
- return None
- return plugin
-
-
-class PluginStore:
- def __init__(self):
- self.plugins: List[Plugin] = []
-
- def __iter__(self):
- yield from self.plugins
-
- def register(self, plugin):
- self.plugins.append(plugin)
-
- def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
- ret = True
- for plugin in ordered_plugin_list:
- if hasattr(plugin, plugin_type):
- try:
- ret = getattr(plugin, plugin_type)(*args, **kwargs)
- if not ret:
- break
- except Exception: # pylint: disable=broad-except
- plugin.logger.exception("Exception while calling %s", plugin_type)
- return ret
-
-
-plugins = PluginStore()
-
-
-def plugin_module_names():
- yield_plugins = set()
-
- # embedded plugins
- for module in iter_modules(path=[dirname(__file__)]):
- yield (__name__ + "." + module.name, False)
- yield_plugins.add(module.name)
- # external plugins
- for module_name in settings['plugins']:
- if module_name not in yield_plugins:
- yield (module_name, True)
- yield_plugins.add(module_name)
+""".. sidebar:: Further reading ..
+
+ - :ref:`plugins admin`
+ - :ref:`SearXNG settings <settings plugins>`
+ - :ref:`builtin plugins`
+
+Plugins can extend or replace functionality of various components of SearXNG.
+Here is an example of a very simple plugin that adds a "Hello" into the answer
+area:
+
+.. code:: python
+
+ from flask_babel import gettext as _
+ from searx.plugins import Plugin
+ from searx.result_types import Answer
+
+ class MyPlugin(Plugin):
+
+ id = "self_info"
+ default_on = True
+
+ def __init__(self):
+ super().__init__()
+ info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin"))
+
+ def post_search(self, request, search):
+ return [ Answer(answer="Hello") ]
+
+Entry points (hooks) define when a plugin runs. Right now only three hooks are
+implemented. So feel free to implement a hook if it fits the behaviour of your
+plugin / a plugin doesn't need to implement all the hooks.
+
+- pre search: :py:obj:`Plugin.pre_search`
+- post search: :py:obj:`Plugin.post_search`
+- on each result item: :py:obj:`Plugin.on_result`
+
+For a coding example have a look at :ref:`self_info plugin`.
+
+----
+
+.. autoclass:: Plugin
+ :members:
+
+.. autoclass:: PluginInfo
+ :members:
+
+.. autoclass:: PluginStorage
+ :members:
+
+.. autoclass:: searx.plugins._core.ModulePlugin
+ :members:
+ :show-inheritance:
+
+"""
+
+from __future__ import annotations
+
+__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
+
+from ._core import PluginInfo, Plugin, PluginStorage
+
+STORAGE: PluginStorage = PluginStorage()
def initialize(app):
- for module_name, external in plugin_module_names():
- plugin = load_and_initialize_plugin(module_name, external, (app, settings))
- if plugin:
- plugins.register(plugin)
+ STORAGE.load_builtins()
+ STORAGE.init(app)
diff --git a/searx/plugins/_core.py b/searx/plugins/_core.py
new file mode 100644
index 000000000..70e5758ec
--- /dev/null
+++ b/searx/plugins/_core.py
@@ -0,0 +1,394 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+# pylint: disable=too-few-public-methods,missing-module-docstring
+
+from __future__ import annotations
+
+__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
+
+import abc
+import importlib
+import logging
+import pathlib
+import types
+import typing
+import warnings
+
+from dataclasses import dataclass, field
+
+import flask
+
+import searx
+from searx.utils import load_module
+from searx.extended_types import SXNG_Request
+from searx.result_types import Result
+
+
+if typing.TYPE_CHECKING:
+ from searx.search import SearchWithPlugins
+
+
+_default = pathlib.Path(__file__).parent
+log: logging.Logger = logging.getLogger("searx.plugins")
+
+
+@dataclass
+class PluginInfo:
+ """Object that holds informations about a *plugin*, these infos are shown to
+ the user in the Preferences menu.
+
+ To be able to translate the information into other languages, the text must
+ be written in English and translated with :py:obj:`flask_babel.gettext`.
+ """
+
+ id: str
+ """The ID-selector in HTML/CSS `#<id>`."""
+
+ name: str
+ """Name of the *plugin*."""
+
+ description: str
+ """Short description of the *answerer*."""
+
+ preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general"
+ """Section (tab/group) in the preferences where this plugin is shown to the
+ user.
+
+ The value ``query`` is reserved for plugins that are activated via a
+ *keyword* as part of a search query, see:
+
+ - :py:obj:`PluginInfo.examples`
+ - :py:obj:`Plugin.keywords`
+
+ Those plugins are shown in the preferences in tab *Special Queries*.
+ """
+
+ examples: list[str] = field(default_factory=list)
+ """List of short examples of the usage / of query terms."""
+
+ keywords: list[str] = field(default_factory=list)
+ """See :py:obj:`Plugin.keywords`"""
+
+
+class Plugin(abc.ABC):
+ """Abstract base class of all Plugins."""
+
+ id: typing.ClassVar[str]
+ """The ID (suffix) in the HTML form."""
+
+ default_on: typing.ClassVar[bool]
+ """Plugin is enabled/disabled by default."""
+
+ keywords: list[str] = []
+ """Keywords in the search query that activate the plugin. The *keyword* is
+ the first word in a search query. If a plugin should be executed regardless
+ of the search query, the list of keywords should be empty (which is also the
+ default in the base class for Plugins)."""
+
+ log: logging.Logger
+ """A logger object, is automatically initialized when calling the
+ constructor (if not already set in the subclass)."""
+
+ info: PluginInfo
+ """Informations about the *plugin*, see :py:obj:`PluginInfo`."""
+
+ def __init__(self) -> None:
+ super().__init__()
+
+ for attr in ["id", "default_on"]:
+ if getattr(self, attr, None) is None:
+ raise NotImplementedError(f"plugin {self} is missing attribute {attr}")
+
+ if not self.id:
+ self.id = f"{self.__class__.__module__}.{self.__class__.__name__}"
+ if not getattr(self, "log", None):
+ self.log = log.getChild(self.id)
+
+ def __hash__(self) -> int:
+ """The hash value is used in :py:obj:`set`, for example, when an object
+ is added to the set. The hash value is also used in other contexts,
+ e.g. when checking for equality to identify identical plugins from
+ different sources (name collisions)."""
+
+ return id(self)
+
+ def __eq__(self, other):
+ """py:obj:`Plugin` objects are equal if the hash values of the two
+ objects are equal."""
+
+ return hash(self) == hash(other)
+
+ def init(self, app: flask.Flask) -> bool: # pylint: disable=unused-argument
+ """Initialization of the plugin, the return value decides whether this
+ plugin is active or not. Initialization only takes place once, at the
+ time the WEB application is set up. The base methode always returns
+ ``True``, the methode can be overwritten in the inheritances,
+
+ - ``True`` plugin is active
+ - ``False`` plugin is inactive
+ """
+ return True
+
+ # pylint: disable=unused-argument
+ def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
+ """Runs BEFORE the search request and returns a boolean:
+
+ - ``True`` to continue the search
+ - ``False`` to stop the search
+ """
+ return True
+
+ def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
+ """Runs for each result of each engine and returns a boolean:
+
+ - ``True`` to keep the result
+ - ``False`` to remove the result from the result list
+
+ The ``result`` can be modified to the needs.
+
+ .. hint::
+
+ If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must
+ be changed accordingly:
+
+ .. code:: python
+
+ result["parsed_url"] = urlparse(result["url"])
+ """
+ return True
+
+ def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]:
+ """Runs AFTER the search request. Can return a list of :py:obj:`Result`
+ objects to be added to the final result list."""
+ return
+
+
+class ModulePlugin(Plugin):
+ """A wrapper class for legacy *plugins*.
+
+ .. note::
+
+ For internal use only!
+
+ In a module plugin, the follwing names are mapped:
+
+ - `module.query_keywords` --> :py:obj:`Plugin.keywords`
+ - `module.plugin_id` --> :py:obj:`Plugin.id`
+ - `module.logger` --> :py:obj:`Plugin.log`
+ """
+
+ _required_attrs = (("name", str), ("description", str), ("default_on", bool))
+
+ def __init__(self, mod: types.ModuleType):
+ """In case of missing attributes in the module or wrong types are given,
+ a :py:obj:`TypeError` exception is raised."""
+
+ self.module = mod
+ self.id = getattr(self.module, "plugin_id", self.module.__name__)
+ self.log = logging.getLogger(self.module.__name__)
+ self.keywords = getattr(self.module, "query_keywords", [])
+
+ for attr, attr_type in self._required_attrs:
+ if not hasattr(self.module, attr):
+ msg = f"missing attribute {attr}, cannot load plugin"
+ self.log.critical(msg)
+ raise TypeError(msg)
+ if not isinstance(getattr(self.module, attr), attr_type):
+ msg = f"attribute {attr} is not of type {attr_type}"
+ self.log.critical(msg)
+ raise TypeError(msg)
+
+ self.default_on = mod.default_on
+ self.info = PluginInfo(
+ id=self.id,
+ name=self.module.name,
+ description=self.module.description,
+ preference_section=getattr(self.module, "preference_section", None),
+ examples=getattr(self.module, "query_examples", []),
+ keywords=self.keywords,
+ )
+
+ # monkeypatch module
+ self.module.logger = self.log # type: ignore
+
+ super().__init__()
+
+ def init(self, app: flask.Flask) -> bool:
+ if not hasattr(self.module, "init"):
+ return True
+ return self.module.init(app)
+
+ def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
+ if not hasattr(self.module, "pre_search"):
+ return True
+ return self.module.pre_search(request, search)
+
+ def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
+ if not hasattr(self.module, "on_result"):
+ return True
+ return self.module.on_result(request, search, result)
+
+ def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]:
+ if not hasattr(self.module, "post_search"):
+ return None
+ return self.module.post_search(request, search)
+
+
+class PluginStorage:
+ """A storage for managing the *plugins* of SearXNG."""
+
+ plugin_list: set[Plugin]
+ """The list of :py:obj:`Plugins` in this storage."""
+
+ legacy_plugins = [
+ "ahmia_filter",
+ "calculator",
+ "hostnames",
+ "oa_doi_rewrite",
+ "tor_check",
+ "tracker_url_remover",
+ "unit_converter",
+ ]
+ """Internal plugins implemented in the legacy style (as module / deprecated!)."""
+
+ def __init__(self):
+ self.plugin_list = set()
+
+ def __iter__(self):
+
+ yield from self.plugin_list
+
+ def __len__(self):
+ return len(self.plugin_list)
+
+ @property
+ def info(self) -> list[PluginInfo]:
+ return [p.info for p in self.plugin_list]
+
+ def load_builtins(self):
+ """Load plugin modules from:
+
+ - the python packages in :origin:`searx/plugins` and
+ - the external plugins from :ref:`settings plugins`.
+ """
+
+ for f in _default.iterdir():
+
+ if f.name.startswith("_"):
+ continue
+
+ if f.stem not in self.legacy_plugins:
+ self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin")
+ continue
+
+ # for backward compatibility
+ mod = load_module(f.name, str(f.parent))
+ self.register(ModulePlugin(mod))
+
+ for fqn in searx.get_setting("plugins"): # type: ignore
+ self.register_by_fqn(fqn)
+
+ def register(self, plugin: Plugin):
+ """Register a :py:obj:`Plugin`. In case of name collision (if two
+ plugins have same ID) a :py:obj:`KeyError` exception is raised.
+ """
+
+ if plugin in self.plugin_list:
+ msg = f"name collision '{plugin.id}'"
+ plugin.log.critical(msg)
+ raise KeyError(msg)
+
+ self.plugin_list.add(plugin)
+ plugin.log.debug("plugin has been loaded")
+
+ def register_by_fqn(self, fqn: str):
+ """Register a :py:obj:`Plugin` via its fully qualified class name (FQN).
+ The FQNs of external plugins could be read from a configuration, for
+ example, and registered using this method
+ """
+
+ mod_name, _, obj_name = fqn.rpartition('.')
+ if not mod_name:
+ # for backward compatibility
+ code_obj = importlib.import_module(fqn)
+ else:
+ mod = importlib.import_module(mod_name)
+ code_obj = getattr(mod, obj_name, None)
+
+ if code_obj is None:
+ msg = f"plugin {fqn} is not implemented"
+ log.critical(msg)
+ raise ValueError(msg)
+
+ if isinstance(code_obj, types.ModuleType):
+ # for backward compatibility
+ warnings.warn(
+ f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning
+ )
+ self.register(ModulePlugin(code_obj))
+ return
+
+ self.register(code_obj())
+
+ def init(self, app: flask.Flask) -> None:
+ """Calls the method :py:obj:`Plugin.init` of each plugin in this
+ storage. Depending on its return value, the plugin is removed from
+ *this* storage or not."""
+
+ for plg in self.plugin_list.copy():
+ if not plg.init(app):
+ self.plugin_list.remove(plg)
+
+ def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
+
+ ret = True
+ for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
+ try:
+ ret = bool(plugin.pre_search(request=request, search=search))
+ except Exception: # pylint: disable=broad-except
+ plugin.log.exception("Exception while calling pre_search")
+ continue
+ if not ret:
+ # skip this search on the first False from a plugin
+ break
+ return ret
+
+ def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
+
+ ret = True
+ for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
+ try:
+ ret = bool(plugin.on_result(request=request, search=search, result=result))
+ except Exception: # pylint: disable=broad-except
+ plugin.log.exception("Exception while calling on_result")
+ continue
+ if not ret:
+ # ignore this result item on the first False from a plugin
+ break
+
+ return ret
+
+ def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None:
+ """Extend :py:obj:`search.result_container
+ <searx.results.ResultContainer`> with result items from plugins listed
+ in :py:obj:`search.user_plugins <SearchWithPlugins.user_plugins>`.
+ """
+
+ keyword = None
+ for keyword in search.search_query.query.split():
+ if keyword:
+ break
+
+ for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
+
+ if plugin.keywords:
+ # plugin with keywords: skip plugin if no keyword match
+ if keyword and keyword not in plugin.keywords:
+ continue
+ try:
+ results = plugin.post_search(request=request, search=search) or []
+ except Exception: # pylint: disable=broad-except
+ plugin.log.exception("Exception while calling post_search")
+ continue
+
+ # In case of *plugins* prefix ``plugin:`` is set, see searx.result_types.Result
+ search.result_container.extend(f"plugin: {plugin.id}", results)
diff --git a/searx/plugins/ahmia_filter.py b/searx/plugins/ahmia_filter.py
index bbf137103..c1252b147 100644
--- a/searx/plugins/ahmia_filter.py
+++ b/searx/plugins/ahmia_filter.py
@@ -1,27 +1,33 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
+from __future__ import annotations
from hashlib import md5
+
+import flask
+
from searx.data import ahmia_blacklist_loader
+from searx import get_setting
+
name = "Ahmia blacklist"
description = "Filter out onion results that appear in Ahmia's blacklist. (See https://ahmia.fi/blacklist)"
default_on = True
preference_section = 'onions'
-ahmia_blacklist = None
+ahmia_blacklist: list = []
-def on_result(_request, _search, result):
+def on_result(_request, _search, result) -> bool:
if not result.get('is_onion') or not result.get('parsed_url'):
return True
result_hash = md5(result['parsed_url'].hostname.encode()).hexdigest()
return result_hash not in ahmia_blacklist
-def init(_app, settings):
+def init(app=flask.Flask) -> bool: # pylint: disable=unused-argument
global ahmia_blacklist # pylint: disable=global-statement
- if not settings['outgoing']['using_tor_proxy']:
+ if not get_setting("outgoing.using_tor_proxy"):
# disable the plugin
return False
ahmia_blacklist = ahmia_blacklist_loader()
diff --git a/searx/plugins/calculator.py b/searx/plugins/calculator.py
index e92ff9d91..162f3c3c6 100644
--- a/searx/plugins/calculator.py
+++ b/searx/plugins/calculator.py
@@ -1,28 +1,27 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
-"""Calculate mathematical expressions using ack#eval
+"""Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
"""
+from __future__ import annotations
+from typing import Callable
+
import ast
import re
import operator
-from multiprocessing import Process, Queue
-from typing import Callable
+import multiprocessing
-import flask
import babel
+import babel.numbers
from flask_babel import gettext
-from searx.plugins import logger
+from searx.result_types import Answer
name = "Basic Calculator"
description = gettext("Calculate mathematical expressions via the search bar")
default_on = True
-
preference_section = 'general'
plugin_id = 'calculator'
-logger = logger.getChild(plugin_id)
-
operators: dict[type, Callable] = {
ast.Add: operator.add,
ast.Sub: operator.sub,
@@ -33,11 +32,17 @@ operators: dict[type, Callable] = {
ast.USub: operator.neg,
}
+# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
+# the old behavior "fork") but it will not solve the core problem of fork, nor
+# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
+# ddiscussed here: https://github.com/searxng/searxng/issues/4159
+mp_fork = multiprocessing.get_context("fork")
+
def _eval_expr(expr):
"""
>>> _eval_expr('2^6')
- 4
+ 64
>>> _eval_expr('2**6')
64
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
@@ -63,46 +68,49 @@ def _eval(node):
raise TypeError(node)
-def timeout_func(timeout, func, *args, **kwargs):
+def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
+ try:
+ q.put(func(*args, **kwargs))
+ except:
+ q.put(None)
+ raise
+
- def handler(q: Queue, func, args, **kwargs): # pylint:disable=invalid-name
- try:
- q.put(func(*args, **kwargs))
- except:
- q.put(None)
- raise
+def timeout_func(timeout, func, *args, **kwargs):
- que = Queue()
- p = Process(target=handler, args=(que, func, args), kwargs=kwargs)
+ que = mp_fork.Queue()
+ p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
p.start()
p.join(timeout=timeout)
ret_val = None
+ # pylint: disable=used-before-assignment,undefined-variable
if not p.is_alive():
ret_val = que.get()
else:
- logger.debug("terminate function after timeout is exceeded")
+ logger.debug("terminate function after timeout is exceeded") # type: ignore
p.terminate()
p.join()
p.close()
return ret_val
-def post_search(_request, search):
+def post_search(request, search) -> list[Answer]:
+ results = []
# only show the result of the expression on the first page
if search.search_query.pageno > 1:
- return True
+ return results
query = search.search_query.query
# in order to avoid DoS attacks with long expressions, ignore long expressions
if len(query) > 100:
- return True
+ return results
# replace commonly used math operators with their proper Python operator
query = query.replace("x", "*").replace(":", "/")
# use UI language
- ui_locale = babel.Locale.parse(flask.request.preferences.get_value('locale'), sep='-')
+ ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
# parse the number system in a localized way
def _decimal(match: re.Match) -> str:
@@ -116,15 +124,17 @@ def post_search(_request, search):
# only numbers and math operators are accepted
if any(str.isalpha(c) for c in query):
- return True
+ return results
# in python, powers are calculated via **
query_py_formatted = query.replace("^", "**")
# Prevent the runtime from being longer than 50 ms
- result = timeout_func(0.05, _eval_expr, query_py_formatted)
- if result is None or result == "":
- return True
- result = babel.numbers.format_decimal(result, locale=ui_locale)
- search.result_container.answers['calculate'] = {'answer': f"{search.search_query.query} = {result}"}
- return True
+ res = timeout_func(0.05, _eval_expr, query_py_formatted)
+ if res is None or res == "":
+ return results
+
+ res = babel.numbers.format_decimal(res, locale=ui_locale)
+ Answer(results=results, answer=f"{search.search_query.query} = {res}")
+
+ return results
diff --git a/searx/plugins/hash_plugin.py b/searx/plugins/hash_plugin.py
index c27e2a432..9db3748b3 100644
--- a/searx/plugins/hash_plugin.py
+++ b/searx/plugins/hash_plugin.py
@@ -1,43 +1,66 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
-# pylint: disable=missing-module-docstring
+# pylint: disable=missing-module-docstring, missing-class-docstring
+from __future__ import annotations
+import typing
-import hashlib
import re
+import hashlib
from flask_babel import gettext
-name = "Hash plugin"
-description = gettext("Converts strings to different hash digests.")
-default_on = True
-preference_section = 'query'
-query_keywords = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
-query_examples = 'sha512 The quick brown fox jumps over the lazy dog'
-
-parser_re = re.compile('(md5|sha1|sha224|sha256|sha384|sha512) (.*)', re.I)
-
-
-def post_search(_request, search):
- # process only on first page
- if search.search_query.pageno > 1:
- return True
- m = parser_re.match(search.search_query.query)
- if not m:
- # wrong query
- return True
-
- function, string = m.groups()
- if not string.strip():
- # end if the string is empty
- return True
-
- # select hash function
- f = hashlib.new(function.lower())
-
- # make digest from the given string
- f.update(string.encode('utf-8').strip())
- answer = function + " " + gettext('hash digest') + ": " + f.hexdigest()
-
- # print result
- search.result_container.answers.clear()
- search.result_container.answers['hash'] = {'answer': answer}
- return True
+from searx.plugins import Plugin, PluginInfo
+from searx.result_types import Answer
+
+if typing.TYPE_CHECKING:
+ from searx.search import SearchWithPlugins
+ from searx.extended_types import SXNG_Request
+
+
+class SXNGPlugin(Plugin):
+ """Plugin converts strings to different hash digests. The results are
+ displayed in area for the "answers".
+ """
+
+ id = "hash_plugin"
+ default_on = True
+ keywords = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
+
+ def __init__(self):
+ super().__init__()
+
+ self.parser_re = re.compile(f"({'|'.join(self.keywords)}) (.*)", re.I)
+ self.info = PluginInfo(
+ id=self.id,
+ name=gettext("Hash plugin"),
+ description=gettext("Converts strings to different hash digests."),
+ examples=["sha512 The quick brown fox jumps over the lazy dog"],
+ preference_section="query",
+ )
+
+ def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]:
+ """Returns a result list only for the first page."""
+ results = []
+
+ if search.search_query.pageno > 1:
+ return results
+
+ m = self.parser_re.match(search.search_query.query)
+ if not m:
+ # wrong query
+ return results
+
+ function, string = m.groups()
+ if not string.strip():
+ # end if the string is empty
+ return results
+
+ # select hash function
+ f = hashlib.new(function.lower())
+
+ # make digest from the given string
+ f.update(string.encode("utf-8").strip())
+ answer = function + " " + gettext("hash digest") + ": " + f.hexdigest()
+
+ Answer(results=results, answer=answer)
+
+ return results
diff --git a/searx/plugins/hostnames.py b/searx/plugins/hostnames.py
index 6519452db..f2d829103 100644
--- a/searx/plugins/hostnames.py
+++ b/searx/plugins/hostnames.py
@@ -91,15 +91,17 @@ something like this:
"""
+from __future__ import annotations
+
import re
from urllib.parse import urlunparse, urlparse
from flask_babel import gettext
from searx import settings
-from searx.plugins import logger
from searx.settings_loader import get_yaml_cfg
+
name = gettext('Hostnames plugin')
description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname')
default_on = False
@@ -107,16 +109,15 @@ preference_section = 'general'
plugin_id = 'hostnames'
-logger = logger.getChild(plugin_id)
parsed = 'parsed_url'
_url_fields = ['iframe_src', 'audio_src']
-def _load_regular_expressions(settings_key):
+def _load_regular_expressions(settings_key) -> dict | set | None:
setting_value = settings.get(plugin_id, {}).get(settings_key)
if not setting_value:
- return {}
+ return None
# load external file with configuration
if isinstance(setting_value, str):
@@ -128,20 +129,20 @@ def _load_regular_expressions(settings_key):
if isinstance(setting_value, dict):
return {re.compile(p): r for (p, r) in setting_value.items()}
- return {}
+ return None
-replacements = _load_regular_expressions('replace')
-removables = _load_regular_expressions('remove')
-high_priority = _load_regular_expressions('high_priority')
-low_priority = _load_regular_expressions('low_priority')
+replacements: dict = _load_regular_expressions('replace') or {} # type: ignore
+removables: set = _load_regular_expressions('remove') or set() # type: ignore
+high_priority: set = _load_regular_expressions('high_priority') or set() # type: ignore
+low_priority: set = _load_regular_expressions('low_priority') or set() # type: ignore
def _matches_parsed_url(result, pattern):
return parsed in result and pattern.search(result[parsed].netloc)
-def on_result(_request, _search, result):
+def on_result(_request, _search, result) -> bool:
for pattern, replacement in replacements.items():
if _matches_parsed_url(result, pattern):
# logger.debug(result['url'])
diff --git a/searx/plugins/oa_doi_rewrite.py b/searx/plugins/oa_doi_rewrite.py
index 6a214282c..be5a8d4a4 100644
--- a/searx/plugins/oa_doi_rewrite.py
+++ b/searx/plugins/oa_doi_rewrite.py
@@ -1,18 +1,21 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
+from __future__ import annotations
import re
from urllib.parse import urlparse, parse_qsl
from flask_babel import gettext
+
from searx import settings
+
regex = re.compile(r'10\.\d{4,9}/[^\s]+')
name = gettext('Open Access DOI rewrite')
description = gettext('Avoid paywalls by redirecting to open-access versions of publications when available')
default_on = False
-preference_section = 'general'
+preference_section = 'general/doi_resolver'
def extract_doi(url):
@@ -34,8 +37,9 @@ def get_doi_resolver(preferences):
return doi_resolvers[selected_resolver]
-def on_result(request, _search, result):
- if 'parsed_url' not in result:
+def on_result(request, _search, result) -> bool:
+
+ if not result.parsed_url:
return True
doi = extract_doi(result['parsed_url'])
diff --git a/searx/plugins/self_info.py b/searx/plugins/self_info.py
index 7cad040d2..d1d7f12b9 100644
--- a/searx/plugins/self_info.py
+++ b/searx/plugins/self_info.py
@@ -1,32 +1,57 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
-# pylint: disable=missing-module-docstring,invalid-name
+# pylint: disable=missing-module-docstring, missing-class-docstring
+from __future__ import annotations
+import typing
import re
from flask_babel import gettext
from searx.botdetection._helpers import get_real_ip
+from searx.result_types import Answer
-name = gettext('Self Information')
-description = gettext('Displays your IP if the query is "ip" and your user agent if the query contains "user agent".')
-default_on = True
-preference_section = 'query'
-query_keywords = ['user-agent']
-query_examples = ''
-
-# "ip" or "my ip" regex
-ip_regex = re.compile('^ip$|my ip', re.IGNORECASE)
-
-# Self User Agent regex
-ua_regex = re.compile('.*user[ -]agent.*', re.IGNORECASE)
-
-
-def post_search(request, search):
- if search.search_query.pageno > 1:
- return True
- if ip_regex.search(search.search_query.query):
- ip = get_real_ip(request)
- search.result_container.answers['ip'] = {'answer': gettext('Your IP is: ') + ip}
- elif ua_regex.match(search.search_query.query):
- ua = request.user_agent
- search.result_container.answers['user-agent'] = {'answer': gettext('Your user-agent is: ') + ua.string}
- return True
+from . import Plugin, PluginInfo
+
+if typing.TYPE_CHECKING:
+ from searx.search import SearchWithPlugins
+ from searx.extended_types import SXNG_Request
+
+
+class SXNGPlugin(Plugin):
+ """Simple plugin that displays information about user's request, including
+ the IP or HTTP User-Agent. The information is displayed in area for the
+ "answers".
+ """
+
+ id = "self_info"
+ default_on = True
+ keywords = ["ip", "user-agent"]
+
+ def __init__(self):
+ super().__init__()
+
+ self.ip_regex = re.compile(r"^ip", re.IGNORECASE)
+ self.ua_regex = re.compile(r"^user-agent", re.IGNORECASE)
+
+ self.info = PluginInfo(
+ id=self.id,
+ name=gettext("Self Information"),
+ description=gettext(
+ """Displays your IP if the query is "ip" and your user agent if the query is "user-agent"."""
+ ),
+ preference_section="query",
+ )
+
+ def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]:
+ """Returns a result list only for the first page."""
+ results = []
+
+ if search.search_query.pageno > 1:
+ return results
+
+ if self.ip_regex.search(search.search_query.query):
+ Answer(results=results, answer=gettext("Your IP is: ") + get_real_ip(request))
+
+ if self.ua_regex.match(search.search_query.query):
+ Answer(results=results, answer=gettext("Your user-agent is: ") + str(request.user_agent))
+
+ return results
diff --git a/searx/plugins/tor_check.py b/searx/plugins/tor_check.py
index 3816d8ece..95281eb42 100644
--- a/searx/plugins/tor_check.py
+++ b/searx/plugins/tor_check.py
@@ -1,8 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""A plugin to check if the ip address of the request is a Tor exit-node if the
user searches for ``tor-check``. It fetches the tor exit node list from
-https://check.torproject.org/exit-addresses and parses all the IPs into a list,
-then checks if the user's IP address is in it.
+:py:obj:`url_exit_list` and parses all the IPs into a list, then checks if the
+user's IP address is in it.
Enable in ``settings.yml``:
@@ -14,10 +14,15 @@ Enable in ``settings.yml``:
"""
+from __future__ import annotations
+
import re
from flask_babel import gettext
from httpx import HTTPError
+
from searx.network import get
+from searx.result_types import Answer
+
default_on = False
@@ -42,27 +47,28 @@ query_examples = ''
# Regex for exit node addresses in the list.
reg = re.compile(r"(?<=ExitAddress )\S+")
+url_exit_list = "https://check.torproject.org/exit-addresses"
+"""URL to load Tor exit list from."""
-def post_search(request, search):
+
+def post_search(request, search) -> list[Answer]:
+ results = []
if search.search_query.pageno > 1:
- return True
+ return results
if search.search_query.query.lower() == "tor-check":
# Request the list of tor exit nodes.
try:
- resp = get("https://check.torproject.org/exit-addresses")
- node_list = re.findall(reg, resp.text)
+ resp = get(url_exit_list)
+ node_list = re.findall(reg, resp.text) # type: ignore
except HTTPError:
# No answer, return error
- search.result_container.answers["tor"] = {
- "answer": gettext(
- "Could not download the list of Tor exit-nodes from: https://check.torproject.org/exit-addresses"
- )
- }
- return True
+ msg = gettext("Could not download the list of Tor exit-nodes from")
+ Answer(results=results, answer=f"{msg} {url_exit_list}")
+ return results
x_forwarded_for = request.headers.getlist("X-Forwarded-For")
@@ -72,20 +78,11 @@ def post_search(request, search):
ip_address = request.remote_addr
if ip_address in node_list:
- search.result_container.answers["tor"] = {
- "answer": gettext(
- "You are using Tor and it looks like you have this external IP address: {ip_address}".format(
- ip_address=ip_address
- )
- )
- }
+ msg = gettext("You are using Tor and it looks like you have the external IP address")
+ Answer(results=results, answer=f"{msg} {ip_address}")
+
else:
- search.result_container.answers["tor"] = {
- "answer": gettext(
- "You are not using Tor and you have this external IP address: {ip_address}".format(
- ip_address=ip_address
- )
- )
- }
-
- return True
+ msg = gettext("You are not using Tor and you have the external IP address")
+ Answer(results=results, answer=f"{msg} {ip_address}")
+
+ return results
diff --git a/searx/plugins/tracker_url_remover.py b/searx/plugins/tracker_url_remover.py
index 2961cd026..f33f7fdfd 100644
--- a/searx/plugins/tracker_url_remover.py
+++ b/searx/plugins/tracker_url_remover.py
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
+from __future__ import annotations
+
import re
from urllib.parse import urlunparse, parse_qsl, urlencode
@@ -19,24 +21,24 @@ default_on = True
preference_section = 'privacy'
-def on_result(_request, _search, result):
- if 'parsed_url' not in result:
- return True
+def on_result(_request, _search, result) -> bool:
- query = result['parsed_url'].query
+ parsed_url = getattr(result, "parsed_url", None)
+ if not parsed_url:
+ return True
- if query == "":
+ if parsed_url.query == "":
return True
- parsed_query = parse_qsl(query)
+ parsed_query = parse_qsl(parsed_url.query)
changes = 0
for i, (param_name, _) in enumerate(list(parsed_query)):
for reg in regexes:
if reg.match(param_name):
parsed_query.pop(i - changes)
changes += 1
- result['parsed_url'] = result['parsed_url']._replace(query=urlencode(parsed_query))
- result['url'] = urlunparse(result['parsed_url'])
+ result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query))
+ result.url = urlunparse(result.parsed_url)
break
return True
diff --git a/searx/plugins/unit_converter.py b/searx/plugins/unit_converter.py
index c85996a76..cdd8287fe 100644
--- a/searx/plugins/unit_converter.py
+++ b/searx/plugins/unit_converter.py
@@ -18,11 +18,14 @@ Enable in ``settings.yml``:
"""
+from __future__ import annotations
import re
import babel.numbers
+
from flask_babel import gettext, get_locale
from searx import data
+from searx.result_types import Answer
name = "Unit converter plugin"
@@ -171,16 +174,16 @@ def symbol_to_si():
return SYMBOL_TO_SI
-def _parse_text_and_convert(search, from_query, to_query):
+def _parse_text_and_convert(from_query, to_query) -> str | None:
# pylint: disable=too-many-branches, too-many-locals
if not (from_query and to_query):
- return
+ return None
measured = re.match(RE_MEASURE, from_query, re.VERBOSE)
if not (measured and measured.group('number'), measured.group('unit')):
- return
+ return None
# Symbols are not unique, if there are several hits for the from-unit, then
# the correct one must be determined by comparing it with the to-unit
@@ -198,7 +201,7 @@ def _parse_text_and_convert(search, from_query, to_query):
target_list.append((si_name, from_si, orig_symbol))
if not (source_list and target_list):
- return
+ return None
source_to_si = target_from_si = target_symbol = None
@@ -212,7 +215,7 @@ def _parse_text_and_convert(search, from_query, to_query):
target_symbol = target[2]
if not (source_to_si and target_from_si):
- return
+ return None
_locale = get_locale() or 'en_US'
@@ -239,25 +242,28 @@ def _parse_text_and_convert(search, from_query, to_query):
else:
result = babel.numbers.format_decimal(value, locale=_locale, format='#,##0.##########;-#')
- search.result_container.answers['conversion'] = {'answer': f'{result} {target_symbol}'}
+ return f'{result} {target_symbol}'
+
+def post_search(_request, search) -> list[Answer]:
+ results = []
-def post_search(_request, search):
# only convert between units on the first page
if search.search_query.pageno > 1:
- return True
+ return results
query = search.search_query.query
query_parts = query.split(" ")
if len(query_parts) < 3:
- return True
+ return results
for query_part in query_parts:
for keyword in CONVERT_KEYWORDS:
if query_part == keyword:
from_query, to_query = query.split(keyword, 1)
- _parse_text_and_convert(search, from_query.strip(), to_query.strip())
- return True
+ target_val = _parse_text_and_convert(from_query.strip(), to_query.strip())
+ if target_val:
+ Answer(results=results, answer=target_val)
- return True
+ return results