summaryrefslogtreecommitdiff
path: root/searx/botdetection/trusted_proxies.py
blob: 4fb4c04abe8aebb14c3fd121f7a501594a70763b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Implementation of a middleware to determine the real IP of an HTTP request
(:py:obj:`flask.request.remote_addr`) behind a proxy chain."""
# pylint: disable=too-many-branches


import typing as t

from collections import abc
from ipaddress import IPv4Address, IPv6Address, ip_address, ip_network, IPv4Network, IPv6Network
from werkzeug.http import parse_list_header

from . import config
from ._helpers import log_error_only_once, logger

if t.TYPE_CHECKING:
    from _typeshed.wsgi import StartResponse
    from _typeshed.wsgi import WSGIApplication
    from _typeshed.wsgi import WSGIEnvironment


@t.final
class ProxyFix:
    """A middleware like the ProxyFix_ class, where the ``x_for`` argument is
    replaced by a method that determines the number of trusted proxies via the
    ``botdetection.trusted_proxies`` setting.

    .. sidebar:: :py:obj:`flask.Request.remote_addr`

       SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``).

    The remote IP (:py:obj:`flask.Request.remote_addr`) of the request is taken
    from (first match):

    - X-Forwarded-For_: If the header is set, the first untrusted IP that comes
      before the IPs that are still part of the ``botdetection.trusted_proxies``
      is used.

    - `X-Real-IP <https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516>`__:
      If X-Forwarded-For_ is not set, `X-Real-IP` is used
      (``botdetection.trusted_proxies`` is ignored).

    If none of the header is set, the REMOTE_ADDR_ from the WSGI layer is used.
    If (for whatever reasons) none IP can be determined, an error message is
    displayed and ``100::`` is used instead (:rfc:`6666`).

    .. _ProxyFix:
       https://werkzeug.palletsprojects.com/middleware/proxy_fix/

    .. _X-Forwarded-For:
       https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For

    .. _REMOTE_ADDR:
       https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#making-some-keys-required

    """

    def __init__(self, wsgi_app: "WSGIApplication") -> None:
        self.wsgi_app = wsgi_app

    def trusted_proxies(self) -> list[IPv4Network | IPv6Network]:
        cfg = config.get_global_cfg()
        proxy_list: list[str] = cfg.get("botdetection.trusted_proxies", default=[])
        return [ip_network(net, strict=False) for net in proxy_list]

    def trusted_remote_addr(
        self,
        x_forwarded_for: list[IPv4Address | IPv6Address],
        trusted_proxies: list[IPv4Network | IPv6Network],
    ) -> str:
        # always rtl
        for addr in reversed(x_forwarded_for):
            trust: bool = False

            for net in trusted_proxies:
                if addr.version == net.version and addr in net:
                    logger.debug("trust proxy %s (member of %s)", addr, net)
                    trust = True
                    break

            # client address
            if not trust:
                return addr.compressed

        # fallback to first address
        return x_forwarded_for[0].compressed

    def __call__(self, environ: "WSGIEnvironment", start_response: "StartResponse") -> abc.Iterable[bytes]:
        # pylint: disable=too-many-statements

        trusted_proxies = self.trusted_proxies()

        # We do not rely on the REMOTE_ADDR from the WSGI environment / the
        # variable is first removed from the WSGI environment and explicitly set
        # in this function!

        orig_remote_addr: str | None = environ.pop("REMOTE_ADDR")

        # Validate the IPs involved in this game and delete all invalid ones
        # from the WSGI environment.

        if orig_remote_addr:
            try:
                addr = ip_address(orig_remote_addr)
                if addr.version == 6 and addr.ipv4_mapped:
                    addr = addr.ipv4_mapped
                orig_remote_addr = addr.compressed
            except ValueError as exc:
                logger.error("REMOTE_ADDR: %s / discard REMOTE_ADDR from WSGI environment", exc)
                orig_remote_addr = None

        x_real_ip: str | None = environ.get("HTTP_X_REAL_IP")
        if x_real_ip:
            try:
                addr = ip_address(x_real_ip)
                if addr.version == 6 and addr.ipv4_mapped:
                    addr = addr.ipv4_mapped
                x_real_ip = addr.compressed
            except ValueError as exc:
                logger.error("X-Real-IP: %s / discard HTTP_X_REAL_IP from WSGI environment", exc)
                environ.pop("HTTP_X_REAL_IP")
                x_real_ip = None

        x_forwarded_for: list[IPv4Address | IPv6Address] = []
        if environ.get("HTTP_X_FORWARDED_FOR"):
            for x_for_ip in parse_list_header(str(environ.get("HTTP_X_FORWARDED_FOR"))):
                try:
                    addr = ip_address(x_for_ip)
                except ValueError as exc:
                    logger.error("X-Forwarded-For: %s / discard HTTP_X_FORWARDED_FOR from WSGI environment", exc)
                    environ.pop("HTTP_X_FORWARDED_FOR")
                    x_forwarded_for = []
                    break

                if addr.version == 6 and addr.ipv4_mapped:
                    addr = addr.ipv4_mapped
                x_forwarded_for.append(addr)

        # log questionable WSGI environments

        if not x_forwarded_for and not x_real_ip:
            log_error_only_once("X-Forwarded-For nor X-Real-IP header is set!")

        if x_forwarded_for and not trusted_proxies:
            log_error_only_once("missing botdetection.trusted_proxies config")
            # without trusted_proxies, this variable is useless for determining
            # the real IP
            x_forwarded_for = []

        # securing the WSGI environment variables that are adjusted

        environ.update({"botdetection.trusted_proxies.orig": {"REMOTE_ADDR": orig_remote_addr}})

        # determine *the real IP*

        if x_forwarded_for:
            environ["REMOTE_ADDR"] = self.trusted_remote_addr(x_forwarded_for, trusted_proxies)

        elif x_real_ip:
            environ["REMOTE_ADDR"] = x_real_ip

        elif orig_remote_addr:
            environ["REMOTE_ADDR"] = orig_remote_addr

        else:
            logger.error("No remote IP could be determined, use black-hole address: 100::")
            environ["REMOTE_ADDR"] = "100::"

        try:
            _ = ip_address(environ["REMOTE_ADDR"])
        except ValueError as exc:
            logger.error("REMOTE_ADDR: %s, use black-hole address: 100::", exc)
            environ["REMOTE_ADDR"] = "100::"

        logger.debug("final REMOTE_ADDR is: %s", environ["REMOTE_ADDR"])
        return self.wsgi_app(environ, start_response)