diff options
Diffstat (limited to 'searx/result_types')
| -rw-r--r-- | searx/result_types/_base.py | 143 |
1 files changed, 138 insertions, 5 deletions
diff --git a/searx/result_types/_base.py b/searx/result_types/_base.py index 1cd4e4d2d..c4c0b18b2 100644 --- a/searx/result_types/_base.py +++ b/searx/result_types/_base.py @@ -26,11 +26,14 @@ import urllib.parse import warnings import typing +from collections.abc import Callable + import msgspec from searx import logger as log WHITESPACE_REGEX = re.compile('( |\t|\n)+', re.M | re.U) +UNKNOWN = object() def _normalize_url_fields(result: Result | LegacyResult): @@ -50,8 +53,6 @@ def _normalize_url_fields(result: Result | LegacyResult): result.parsed_url = result.parsed_url._replace( # if the result has no scheme, use http as default scheme=result.parsed_url.scheme or "http", - # normalize ``www.example.com`` to ``example.com`` - # netloc=result.parsed_url.netloc.replace("www.", ""), # normalize ``example.com/path/`` to ``example.com/path`` path=result.parsed_url.path.rstrip("/"), ) @@ -107,6 +108,110 @@ def _normalize_text_fields(result: MainResult | LegacyResult): result.content = "" +def _filter_urls(result: Result | LegacyResult, filter_func: Callable[[Result | LegacyResult, str, str], str | bool]): + # pylint: disable=too-many-branches, too-many-statements + + # As soon we need LegacyResult not any longer, we can move this function to + # method Result. + + url_fields = ["url", "iframe_src", "audio_src", "img_src", "thumbnail_src", "thumbnail"] + + for field_name in url_fields: + url_src = getattr(result, field_name, "") + if not url_src: + continue + + new_url = filter_func(result, field_name, url_src) + # log.debug("filter_urls: filter_func(result, %s) '%s' -> '%s'", field_name, field_value, new_url) + if isinstance(new_url, bool): + if new_url: + # log.debug("filter_urls: unchanged field %s URL %s", field_name, field_value) + continue + log.debug("filter_urls: drop field %s URL %s", field_name, url_src) + new_url = None + else: + log.debug("filter_urls: modify field %s URL %s -> %s", field_name, url_src, new_url) + + setattr(result, field_name, new_url) + if field_name == "url": + # sync parsed_url with new_url + if not new_url: + result.parsed_url = None + elif isinstance(new_url, str): + result.parsed_url = urllib.parse.urlparse(new_url) + + # "urls": are from infobox + # + # As soon we have InfoboxResult, we can move this function to method + # InfoboxResult.normalize_result_fields + + infobox_urls: list[dict[str, str]] = getattr(result, "urls", []) + + if infobox_urls: + # log.debug("filter_urls: infobox_urls .. %s", infobox_urls) + new_infobox_urls: list[dict[str, str]] = [] + + for item in infobox_urls: + url_src = item.get("url") + if not url_src: + new_infobox_urls.append(item) + continue + + new_url = filter_func(result, "infobox_urls", url_src) + if isinstance(new_url, bool): + if new_url: + new_infobox_urls.append(item) + # log.debug("filter_urls: leave URL in field 'urls' ('infobox_urls') unchanged -> %s", _url) + continue + log.debug("filter_urls: remove URL from field 'urls' ('infobox_urls') URL %s", url_src) + new_url = None + if new_url: + log.debug("filter_urls: modify URL from field 'urls' ('infobox_urls') URL %s -> %s", url_src, new_url) + item["url"] = new_url + new_infobox_urls.append(item) + + setattr(result, "urls", new_infobox_urls) + + # "attributes": are from infobox + # + # The infobox has additional subsections for attributes, urls and relatedTopics: + + infobox_attributes: list[dict[str, dict]] = getattr(result, "attributes", []) + + if infobox_attributes: + # log.debug("filter_urls: infobox_attributes .. %s", infobox_attributes) + new_infobox_attributes: list[dict[str, dict]] = [] + + for item in infobox_attributes: + image = item.get("image", {}) + url_src = image.get("src", "") + if not url_src: + new_infobox_attributes.append(item) + continue + + new_url = filter_func(result, "infobox_attributes", url_src) + if isinstance(new_url, bool): + if new_url: + new_infobox_attributes.append(item) + # log.debug("filter_urls: leave URL in field 'image.src' unchanged -> %s", url_src) + continue + log.debug("filter_urls: drop field 'image.src' ('infobox_attributes') URL %s", url_src) + new_url = None + + if new_url: + log.debug( + "filter_urls: modify 'image.src' ('infobox_attributes') URL %s -> %s", + url_src, + new_url, + ) + item["image"]["src"] = new_url + new_infobox_attributes.append(item) + + setattr(result, "attributes", new_infobox_attributes) + + result.normalize_result_fields() + + class Result(msgspec.Struct, kw_only=True): """Base class of all result types :ref:`result types`.""" @@ -142,9 +247,6 @@ class Result(msgspec.Struct, kw_only=True): with the resulting value in ``parse_url``, if ``url`` and ``parse_url`` are not equal. - - ``www.example.com`` and ``example.com`` are equivalent and are normalized - to ``example.com``. - - ``example.com/path/`` and ``example.com/path`` are equivalent and are normalized to ``example.com/path``. """ @@ -153,6 +255,33 @@ class Result(msgspec.Struct, kw_only=True): def __post_init__(self): pass + def filter_urls(self, filter_func: Callable[[Result | LegacyResult, str, str], str | bool]): + """A filter function is passed in the ``filter_func`` argument to + filter and/or modify the URLs. + + The filter function receives the :py:obj:`result object <Result>` as + the first argument and the field name (``str``) in the second argument. + In the third argument the URL string value is passed to the filter function. + + The filter function is applied to all fields that contain a URL, + in addition to the familiar ``url`` field, these include fields such as:: + + ["url", "iframe_src", "audio_src", "img_src", "thumbnail_src", "thumbnail"] + + and the ``urls`` list of items of the infobox. + + For each field, the filter function is called and returns a bool or a + string value: + + - ``True``: leave URL in field unchanged + - ``False``: remove URL field from result (or remove entire result) + - ``str``: modified URL to be used instead + + See :ref:`filter urls example`. + + """ + _filter_urls(self, filter_func=filter_func) + def __hash__(self) -> int: """Generates a hash value that uniquely identifies the content of *this* result. The method can be adapted in the inheritance to compare results @@ -394,3 +523,7 @@ class LegacyResult(dict): for k, v in other.items(): if not self.get(k): self[k] = v + + def filter_urls(self, filter_func: Callable[[Result | LegacyResult, str, str], str | bool]): + """See :py:obj:`Result.filter_urls`""" + _filter_urls(self, filter_func=filter_func) |