diff options
Diffstat (limited to 'searx/botdetection/link_token.py')
| -rw-r--r-- | searx/botdetection/link_token.py | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/searx/botdetection/link_token.py b/searx/botdetection/link_token.py new file mode 100644 index 000000000..11a6a56b5 --- /dev/null +++ b/searx/botdetection/link_token.py @@ -0,0 +1,156 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +""" +Method ``link_token`` +--------------------- + +The ``link_token`` method evaluates a request as :py:obj:`suspicious +<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the +client. By adding a random component (the token) in the URL, a bot can not send +a ping by request a static URL. + +.. note:: + + This method requires a redis DB and needs a HTTP X-Forwarded-For_ header. + +To get in use of this method a flask URL route needs to be added: + +.. code:: python + + @app.route('/client<token>.css', methods=['GET', 'POST']) + def client_token(token=None): + link_token.ping(request, token) + return Response('', mimetype='text/css') + +And in the HTML template from flask a stylesheet link is needed (the value of +``link_token`` comes from :py:obj:`get_token`): + +.. code:: html + + <link rel="stylesheet" + href="{{ url_for('client_token', token=link_token) }}" + type="text/css" /> + +.. _X-Forwarded-For: + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For + +""" +from __future__ import annotations +from ipaddress import ( + IPv4Network, + IPv6Network, +) + +import string +import random +import flask + +from searx import logger +from searx import redisdb +from searx.redislib import secret_hash + +from ._helpers import ( + get_network, + get_real_ip, +) + +TOKEN_LIVE_TIME = 600 +"""Livetime (sec) of limiter's CSS token.""" + +PING_LIVE_TIME = 3600 +"""Livetime (sec) of the ping-key from a client (request)""" + +PING_KEY = 'SearXNG_limiter.ping' +"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`""" + +TOKEN_KEY = 'SearXNG_limiter.token' +"""Key for which the current token is stored in the DB""" + +logger = logger.getChild('botdetection.link_token') + + +def is_suspicious(network: IPv4Network | IPv6Network, request: flask.Request, renew: bool = False): + """Checks whether a valid ping is exists for this (client) network, if not + this request is rated as *suspicious*. If a valid ping exists and argument + ``renew`` is ``True`` the expire time of this ping is reset to + :py:obj:`PING_LIVE_TIME`. + + """ + redis_client = redisdb.client() + if not redis_client: + return False + + ping_key = get_ping_key(network, request) + if not redis_client.get(ping_key): + logger.warning("missing ping (IP: %s) / request: %s", network.compressed, ping_key) + return True + + if renew: + redis_client.set(ping_key, 1, ex=PING_LIVE_TIME) + + logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key) + return False + + +def ping(request: flask.Request, token: str): + """This function is called by a request to URL ``/client<token>.css``. If + ``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB. + The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`. + + """ + from . import limiter # pylint: disable=import-outside-toplevel, cyclic-import + + redis_client = redisdb.client() + if not redis_client: + return + if not token_is_valid(token): + return + + cfg = limiter.get_cfg() + real_ip = get_real_ip(request) + network = get_network(real_ip, cfg) + + ping_key = get_ping_key(network, request) + logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key) + redis_client.set(ping_key, 1, ex=PING_LIVE_TIME) + + +def get_ping_key(network: IPv4Network | IPv6Network, request: flask.Request) -> str: + """Generates a hashed key that fits (more or less) to a *WEB-browser + session* in a network.""" + return ( + PING_KEY + + "[" + + secret_hash( + network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '') + ) + + "]" + ) + + +def token_is_valid(token) -> bool: + valid = token == get_token() + logger.debug("token is valid --> %s", valid) + return valid + + +def get_token() -> str: + """Returns current token. If there is no currently active token a new token + is generated randomly and stored in the redis DB. + + - :py:obj:`TOKEN_LIVE_TIME` + - :py:obj:`TOKEN_KEY` + + """ + redis_client = redisdb.client() + if not redis_client: + # This function is also called when limiter is inactive / no redis DB + # (see render function in webapp.py) + return '12345678' + token = redis_client.get(TOKEN_KEY) + if token: + token = token.decode('UTF-8') + else: + token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16)) + redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME) + return token |