summaryrefslogtreecommitdiff
path: root/searx/botdetection/link_token.py
blob: a83214a33c17889457fcff5c97ceff2c2776dc0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
"""
Method ``link_token``
---------------------

The ``link_token`` method evaluates a request as :py:obj:`suspicious
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
client.  By adding a random component (the token) in the URL a bot can not send
a ping by request a static URL.

.. note::

   This method requires a redis DB and needs a HTTP X-Forwarded-For_ header.

To get in use of this method a flask URL route needs to be added:

.. code:: python

   @app.route('/client<token>.css', methods=['GET', 'POST'])
   def client_token(token=None):
       link_token.ping(request, token)
       return Response('', mimetype='text/css')

And in the HTML template from flask a stylesheet link is needed (the value of
``link_token`` comes from :py:obj:`get_token`):

.. code:: html

   <link rel="stylesheet"
         href="{{ url_for('client_token', token=link_token) }}"
         type="text/css" />

.. _X-Forwarded-For:
   https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For

"""

import string
import random
import flask

from searx import logger
from searx import redisdb
from searx.redislib import secret_hash
from ._helpers import get_real_ip

TOKEN_LIVE_TIME = 600
"""Livetime (sec) of limiter's CSS token."""

PING_LIVE_TIME = 3600
"""Livetime (sec) of the ping-key from a client (request)"""

PING_KEY = 'SearXNG_limiter.ping'
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""

TOKEN_KEY = 'SearXNG_limiter.token'
"""Key for which the current token is stored in the DB"""

logger = logger.getChild('botdetection.link_token')


def is_suspicious(request: flask.Request, renew: bool = False):
    """Checks if there is a valid ping for this request, if not this request is
    rated as *suspicious*.  If a valid ping exists and argument ``renew`` is
    ``True`` the expire time of this ping is reset to :py:obj:`PING_LIVE_TIME`.

    """
    redis_client = redisdb.client()
    if not redis_client:
        return False

    ping_key = get_ping_key(request)
    if not redis_client.get(ping_key):
        logger.warning(
            "missing ping (IP: %s) / request: %s",
            get_real_ip(request),
            ping_key,
        )
        return True

    if renew:
        redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)

    logger.debug("found ping for client request: %s", ping_key)
    return False


def ping(request: flask.Request, token: str):
    """This function is called by a request to URL ``/client<token>.css``.  If
    ``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
    The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.

    """
    redis_client = redisdb.client()
    if not redis_client:
        return
    if not token_is_valid(token):
        return
    ping_key = get_ping_key(request)
    logger.debug("store ping for: %s", ping_key)
    redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)


def get_ping_key(request: flask.Request):
    """Generates a hashed key that fits (more or less) to a client (request).
    At least X-Forwarded-For_ is needed to be able to assign the request to an
    IP.

    """
    return (
        PING_KEY
        + "["
        + secret_hash(
            get_real_ip(request) + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
        )
        + "]"
    )


def token_is_valid(token) -> bool:
    valid = token == get_token()
    logger.debug("token is valid --> %s", valid)
    return valid


def get_token() -> str:
    """Returns current token.  If there is no currently active token a new token
    is generated randomly and stored in the redis DB.

    - :py:obj:`TOKEN_LIVE_TIME`
    - :py:obj:`TOKEN_KEY`

    """
    redis_client = redisdb.client()
    if not redis_client:
        # This function is also called when limiter is inactive / no redis DB
        # (see render function in webapp.py)
        return '12345678'
    token = redis_client.get(TOKEN_KEY)
    if token:
        token = token.decode('UTF-8')
    else:
        token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
        redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
    return token