diff options
Diffstat (limited to 'searx/data/tracker_patterns.py')
| -rw-r--r-- | searx/data/tracker_patterns.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/searx/data/tracker_patterns.py b/searx/data/tracker_patterns.py new file mode 100644 index 000000000..f269b8395 --- /dev/null +++ b/searx/data/tracker_patterns.py @@ -0,0 +1,142 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Simple implementation to store TrackerPatterns data in a SQL database.""" + +from __future__ import annotations +import typing + +__all__ = ["TrackerPatternsDB"] + +import re +import pathlib +from collections.abc import Iterator +from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode + +import httpx + +from searx.data.core import get_cache, log + +RuleType = tuple[str, list[str], list[str]] + + +class TrackerPatternsDB: + # pylint: disable=missing-class-docstring + + ctx_name = "data_tracker_patterns" + json_file = pathlib.Path(__file__).parent / "tracker_patterns.json" + + CLEAR_LIST_URL = [ + # ClearURL rule lists, the first one that responds HTTP 200 is used + "https://rules1.clearurls.xyz/data.minify.json", + "https://rules2.clearurls.xyz/data.minify.json", + "https://raw.githubusercontent.com/ClearURLs/Rules/refs/heads/master/data.min.json", + ] + + class Fields: + # pylint: disable=too-few-public-methods, invalid-name + url_regexp: typing.Final = 0 # URL (regular expression) match condition of the link + url_ignore: typing.Final = 1 # URL (regular expression) to ignore + del_args: typing.Final = 2 # list of URL arguments (regular expression) to delete + + def __init__(self): + self.cache = get_cache() + + def init(self): + if self.cache.properties("tracker_patterns loaded") != "OK": + self.load() + self.cache.properties.set("tracker_patterns loaded", "OK") + # F I X M E: + # do we need a maintenance .. rember: database is stored + # in /tmp and will be rebuild during the reboot anyway + + def load(self): + log.debug("init searx.data.TRACKER_PATTERNS") + for rule in self.iter_clear_list(): + self.add(rule) + + def add(self, rule: RuleType): + self.cache.set( + key=rule[self.Fields.url_regexp], + value=( + rule[self.Fields.url_ignore], + rule[self.Fields.del_args], + ), + ctx=self.ctx_name, + expire=None, + ) + + def rules(self) -> Iterator[RuleType]: + self.init() + for key, value in self.cache.pairs(ctx=self.ctx_name): + yield key, value[0], value[1] + + def iter_clear_list(self) -> Iterator[RuleType]: + resp = None + for url in self.CLEAR_LIST_URL: + resp = httpx.get(url, timeout=3) + if resp.status_code == 200: + break + log.warning(f"TRACKER_PATTERNS: ClearURL ignore HTTP {resp.status_code} {url}") + + if resp is None: + log.error("TRACKER_PATTERNS: failed fetching ClearURL rule lists") + return + + for rule in resp.json()["providers"].values(): + yield ( + rule["urlPattern"].replace("\\\\", "\\"), # fix javascript regex syntax + [exc.replace("\\\\", "\\") for exc in rule.get("exceptions", [])], + rule.get("rules", []), + ) + + def clean_url(self, url: str) -> bool | str: + """The URL arguments are normalized and cleaned of tracker parameters. + + Returns bool ``True`` to use URL unchanged (``False`` to ignore URL). + If URL should be modified, the returned string is the new URL to use. + """ + + new_url = url + parsed_new_url = urlparse(url=new_url) + + for rule in self.rules(): + + if not re.match(rule[self.Fields.url_regexp], new_url): + # no match / ignore pattern + continue + + do_ignore = False + for pattern in rule[self.Fields.url_ignore]: + if re.match(pattern, new_url): + do_ignore = True + break + + if do_ignore: + # pattern is in the list of exceptions / ignore pattern + # HINT: + # we can't break the outer pattern loop since we have + # overlapping urlPattern like ".*" + continue + + # remove tracker arguments from the url-query part + query_args: list[tuple[str, str]] = list(parse_qsl(parsed_new_url.query)) + + for name, val in query_args.copy(): + # remove URL arguments + for pattern in rule[self.Fields.del_args]: + if re.match(pattern, name): + log.debug("TRACKER_PATTERNS: %s remove tracker arg: %s='%s'", parsed_new_url.netloc, name, val) + query_args.remove((name, val)) + + parsed_new_url = parsed_new_url._replace(query=urlencode(query_args)) + new_url = urlunparse(parsed_new_url) + + if new_url != url: + return new_url + + return True + + +if __name__ == "__main__": + db = TrackerPatternsDB() + for r in db.rules(): + print(r) |