summaryrefslogtreecommitdiff
path: root/searx
diff options
context:
space:
mode:
Diffstat (limited to 'searx')
-rw-r--r--searx/engines/duckduckgo_weather.py8
-rw-r--r--searx/engines/open_meteo.py12
-rw-r--r--searx/engines/wttr.py12
-rw-r--r--searx/plugins/_core.py30
-rw-r--r--searx/plugins/time_zone.py7
-rw-r--r--searx/weather.py195
-rw-r--r--searx/webutils.py3
7 files changed, 141 insertions, 126 deletions
diff --git a/searx/engines/duckduckgo_weather.py b/searx/engines/duckduckgo_weather.py
index 9fad1e546..4d52effcd 100644
--- a/searx/engines/duckduckgo_weather.py
+++ b/searx/engines/duckduckgo_weather.py
@@ -76,12 +76,12 @@ def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
return EngineResults.types.WeatherAnswer.Item(
location=location,
- temperature=weather.Temperature(unit="°C", value=data['temperature']),
+ temperature=weather.Temperature(val=data['temperature'], unit="°C"),
condition=WEATHERKIT_TO_CONDITION[data["conditionCode"]],
- feels_like=weather.Temperature(unit="°C", value=data['temperatureApparent']),
+ feels_like=weather.Temperature(val=data['temperatureApparent'], unit="°C"),
wind_from=weather.Compass(data["windDirection"]),
- wind_speed=weather.WindSpeed(data["windSpeed"], unit="mi/h"),
- pressure=weather.Pressure(data["pressure"], unit="hPa"),
+ wind_speed=weather.WindSpeed(val=data["windSpeed"], unit="mi/h"),
+ pressure=weather.Pressure(val=data["pressure"], unit="hPa"),
humidity=weather.RelativeHumidity(data["humidity"] * 100),
cloud_cover=data["cloudCover"] * 100,
)
diff --git a/searx/engines/open_meteo.py b/searx/engines/open_meteo.py
index 31ada12b0..948996b3c 100644
--- a/searx/engines/open_meteo.py
+++ b/searx/engines/open_meteo.py
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Open Meteo (weather)"""
+import typing as t
+
from urllib.parse import urlencode
from datetime import datetime
@@ -106,16 +108,16 @@ WMO_TO_CONDITION: dict[int, weather.WeatherConditionType] = {
}
-def _weather_data(location: weather.GeoLocation, data: dict):
+def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
return WeatherAnswer.Item(
location=location,
- temperature=weather.Temperature(unit="°C", value=data["temperature_2m"]),
+ temperature=weather.Temperature(val=data["temperature_2m"], unit="°C"),
condition=WMO_TO_CONDITION[data["weather_code"]],
- feels_like=weather.Temperature(unit="°C", value=data["apparent_temperature"]),
+ feels_like=weather.Temperature(val=data["apparent_temperature"], unit="°C"),
wind_from=weather.Compass(data["wind_direction_10m"]),
- wind_speed=weather.WindSpeed(data["wind_speed_10m"], unit="km/h"),
- pressure=weather.Pressure(data["pressure_msl"], unit="hPa"),
+ wind_speed=weather.WindSpeed(val=data["wind_speed_10m"], unit="km/h"),
+ pressure=weather.Pressure(val=data["pressure_msl"], unit="hPa"),
humidity=weather.RelativeHumidity(data["relative_humidity_2m"]),
cloud_cover=data["cloud_cover"],
)
diff --git a/searx/engines/wttr.py b/searx/engines/wttr.py
index aec4d1075..9c7f69b43 100644
--- a/searx/engines/wttr.py
+++ b/searx/engines/wttr.py
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""wttr.in (weather forecast service)"""
+import typing as t
+
from urllib.parse import quote
from datetime import datetime
@@ -80,19 +82,19 @@ def request(query, params):
return params
-def _weather_data(location: weather.GeoLocation, data: dict):
+def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
# the naming between different data objects is inconsitent, thus temp_C and
# tempC are possible
tempC: float = data.get("temp_C") or data.get("tempC") # type: ignore
return WeatherAnswer.Item(
location=location,
- temperature=weather.Temperature(unit="°C", value=tempC),
+ temperature=weather.Temperature(val=tempC, unit="°C"),
condition=WWO_TO_CONDITION[data["weatherCode"]],
- feels_like=weather.Temperature(unit="°C", value=data["FeelsLikeC"]),
+ feels_like=weather.Temperature(val=data["FeelsLikeC"], unit="°C"),
wind_from=weather.Compass(int(data["winddirDegree"])),
- wind_speed=weather.WindSpeed(data["windspeedKmph"], unit="km/h"),
- pressure=weather.Pressure(data["pressure"], unit="hPa"),
+ wind_speed=weather.WindSpeed(val=data["windspeedKmph"], unit="km/h"),
+ pressure=weather.Pressure(val=data["pressure"], unit="hPa"),
humidity=weather.RelativeHumidity(data["humidity"]),
cloud_cover=data["cloudcover"],
)
diff --git a/searx/plugins/_core.py b/searx/plugins/_core.py
index ef8e5cf46..4b9db076e 100644
--- a/searx/plugins/_core.py
+++ b/searx/plugins/_core.py
@@ -1,7 +1,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=too-few-public-methods,missing-module-docstring
-
__all__ = ["PluginInfo", "Plugin", "PluginCfg", "PluginStorage"]
import abc
@@ -9,16 +8,17 @@ import importlib
import inspect
import logging
import re
-import typing
-from collections.abc import Sequence
+
+import typing as t
+from collections.abc import Generator
from dataclasses import dataclass, field
from searx.extended_types import SXNG_Request
-from searx.result_types import Result
-if typing.TYPE_CHECKING:
+if t.TYPE_CHECKING:
from searx.search import SearchWithPlugins
+ from searx.result_types import Result, EngineResults, LegacyResult # pyright: ignore[reportPrivateLocalImportUsage]
import flask
log: logging.Logger = logging.getLogger("searx.plugins")
@@ -42,7 +42,7 @@ class PluginInfo:
description: str
"""Short description of the *answerer*."""
- preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general"
+ preference_section: t.Literal["general", "ui", "privacy", "query"] | None = "general"
"""Section (tab/group) in the preferences where this plugin is shown to the
user.
@@ -71,7 +71,7 @@ class Plugin(abc.ABC):
id: str = ""
"""The ID (suffix) in the HTML form."""
- active: typing.ClassVar[bool]
+ active: t.ClassVar[bool]
"""Plugin is enabled/disabled by default (:py:obj:`PluginCfg.active`)."""
keywords: list[str] = []
@@ -109,7 +109,7 @@ class Plugin(abc.ABC):
raise ValueError(f"plugin ID {self.id} contains invalid character (use lowercase ASCII)")
if not getattr(self, "log", None):
- pkg_name = inspect.getmodule(self.__class__).__package__ # type: ignore
+ pkg_name = inspect.getmodule(self.__class__).__package__ # pyright: ignore[reportOptionalMemberAccess]
self.log = logging.getLogger(f"{pkg_name}.{self.id}")
def __hash__(self) -> int:
@@ -120,7 +120,7 @@ class Plugin(abc.ABC):
return id(self)
- def __eq__(self, other: typing.Any):
+ def __eq__(self, other: t.Any):
"""py:obj:`Plugin` objects are equal if the hash values of the two
objects are equal."""
@@ -146,7 +146,7 @@ class Plugin(abc.ABC):
"""
return True
- def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
+ def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: "Result") -> bool:
"""Runs for each result of each engine and returns a boolean:
- ``True`` to keep the result
@@ -166,7 +166,9 @@ class Plugin(abc.ABC):
"""
return True
- def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | Sequence[Result]:
+ def post_search(
+ self, request: SXNG_Request, search: "SearchWithPlugins"
+ ) -> "None | list[Result | LegacyResult] | EngineResults":
"""Runs AFTER the search request. Can return a list of
:py:obj:`Result <searx.result_types._base.Result>` objects to be added to the
final result list."""
@@ -196,7 +198,7 @@ class PluginStorage:
def __init__(self):
self.plugin_list = set()
- def __iter__(self):
+ def __iter__(self) -> Generator[Plugin]:
yield from self.plugin_list
def __len__(self):
@@ -207,7 +209,7 @@ class PluginStorage:
return [p.info for p in self.plugin_list]
- def load_settings(self, cfg: dict[str, dict[str, typing.Any]]):
+ def load_settings(self, cfg: dict[str, dict[str, t.Any]]):
"""Load plugins configured in SearXNG's settings :ref:`settings
plugins`."""
@@ -262,7 +264,7 @@ class PluginStorage:
break
return ret
- def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
+ def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: "Result") -> bool:
ret = True
for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
diff --git a/searx/plugins/time_zone.py b/searx/plugins/time_zone.py
index f54a9ce6c..9239a3fbc 100644
--- a/searx/plugins/time_zone.py
+++ b/searx/plugins/time_zone.py
@@ -1,12 +1,11 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
-from __future__ import annotations
import typing as t
import datetime
-from flask_babel import gettext # type: ignore
+from flask_babel import gettext
from searx.result_types import EngineResults
from searx.weather import DateTime, GeoLocation
@@ -53,13 +52,13 @@ class SXNGPlugin(Plugin):
search_term = " ".join(query_parts).strip()
if not search_term:
- date_time = DateTime(time=datetime.datetime.now())
+ date_time = DateTime(datetime.datetime.now())
results.add(results.types.Answer(answer=date_time.l10n()))
return results
geo = GeoLocation.by_query(search_term=search_term)
if geo:
- date_time = DateTime(time=datetime.datetime.now(tz=geo.zoneinfo))
+ date_time = DateTime(datetime.datetime.now(tz=geo.zoneinfo))
tz_name = geo.timezone.replace('_', ' ')
results.add(
results.types.Answer(
diff --git a/searx/weather.py b/searx/weather.py
index c8f3cf973..30b5bfeee 100644
--- a/searx/weather.py
+++ b/searx/weather.py
@@ -14,11 +14,10 @@ __all__ = [
"GeoLocation",
]
-import typing
+import typing as t
import base64
import datetime
-import dataclasses
import zoneinfo
from urllib.parse import quote_plus
@@ -27,7 +26,8 @@ import babel
import babel.numbers
import babel.dates
import babel.languages
-import flask_babel
+import flask_babel # pyright: ignore[reportMissingTypeStubs]
+import msgspec
from searx import network
from searx.cache import ExpireCache, ExpireCacheCfg
@@ -120,8 +120,7 @@ def symbol_url(condition: "WeatherConditionType") -> str | None:
return data_url
-@dataclasses.dataclass
-class GeoLocation:
+class GeoLocation(msgspec.Struct, kw_only=True):
"""Minimal implementation of Geocoding."""
# The type definition was based on the properties from the geocoding API of
@@ -176,6 +175,8 @@ class GeoLocation:
ctx = "weather_geolocation_by_query"
cache = get_WEATHER_DATA_CACHE()
+ # {'name': 'Berlin', 'latitude': 52.52437, 'longitude': 13.41053,
+ # 'elevation': 74.0, 'country_code': 'DE', 'timezone': 'Europe/Berlin'}
geo_props = cache.get(search_term, ctx=ctx)
if not geo_props:
@@ -194,15 +195,14 @@ class GeoLocation:
if not results:
raise ValueError(f"unknown geo location: '{search_term}'")
location = results[0]
- return {field.name: location[field.name] for field in dataclasses.fields(cls)}
+ return {field_name: location[field_name] for field_name in cls.__struct_fields__}
-DateTimeFormats = typing.Literal["full", "long", "medium", "short"]
-DateTimeLocaleTypes = typing.Literal["UI"]
+DateTimeFormats = t.Literal["full", "long", "medium", "short"]
+DateTimeLocaleTypes = t.Literal["UI"]
-@typing.final
-class DateTime:
+class DateTime(msgspec.Struct):
"""Class to represent date & time. Essentially, it is a wrapper that
conveniently combines :py:obj:`datetime.datetime` and
:py:obj:`babel.dates.format_datetime`. A conversion of time zones is not
@@ -216,8 +216,7 @@ class DateTime:
as the value for the ``locale``.
"""
- def __init__(self, time: datetime.datetime):
- self.datetime = time
+ datetime: datetime.datetime
def __str__(self):
return self.l10n()
@@ -252,36 +251,35 @@ class DateTime:
return babel.dates.format_date(self.datetime, format=fmt, locale=locale)
-@typing.final
-class Temperature:
+TemperatureUnits: t.TypeAlias = t.Literal["°C", "°F", "K"]
+
+
+class Temperature(msgspec.Struct, kw_only=True):
"""Class for converting temperature units and for string representation of
measured values."""
- si_name = "Q11579"
+ val: float
+ unit: TemperatureUnits
- Units = typing.Literal["°C", "°F", "K"]
- """Supported temperature units."""
+ si_name: t.ClassVar[str] = "Q11579"
+ units: t.ClassVar[list[str]] = list(t.get_args(TemperatureUnits))
- units = list(typing.get_args(Units))
-
- def __init__(self, value: float, unit: Units):
- if unit not in self.units:
- raise ValueError(f"invalid unit: {unit}")
- self.si: float = convert_to_si( # pylint: disable=invalid-name
- si_name=self.si_name,
- symbol=unit,
- value=value,
- )
+ def __post_init__(self):
+ if self.unit not in self.units:
+ raise ValueError(f"invalid unit: {self.unit}")
def __str__(self):
return self.l10n()
- def value(self, unit: Units) -> float:
- return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
+ def value(self, unit: TemperatureUnits) -> float:
+ if unit == self.unit:
+ return self.val
+ si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
+ return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
def l10n(
self,
- unit: Units | None = None,
+ unit: TemperatureUnits | None = None,
locale: babel.Locale | GeoLocation | None = None,
template: str = "{value} {unit}",
num_pattern: str = "#,##0",
@@ -320,33 +318,35 @@ class Temperature:
return template.format(value=val_str, unit=unit)
-@typing.final
-class Pressure:
+PressureUnits: t.TypeAlias = t.Literal["Pa", "hPa", "cm Hg", "bar"]
+
+
+class Pressure(msgspec.Struct, kw_only=True):
"""Class for converting pressure units and for string representation of
measured values."""
- si_name = "Q44395"
-
- Units = typing.Literal["Pa", "hPa", "cm Hg", "bar"]
- """Supported units."""
+ val: float
+ unit: PressureUnits
- units = list(typing.get_args(Units))
+ si_name: t.ClassVar[str] = "Q44395"
+ units: t.ClassVar[list[str]] = list(t.get_args(PressureUnits))
- def __init__(self, value: float, unit: Units):
- if unit not in self.units:
- raise ValueError(f"invalid unit: {unit}")
- # pylint: disable=invalid-name
- self.si: float = convert_to_si(si_name=self.si_name, symbol=unit, value=value)
+ def __post_init__(self):
+ if self.unit not in self.units:
+ raise ValueError(f"invalid unit: {self.unit}")
def __str__(self):
return self.l10n()
- def value(self, unit: Units) -> float:
- return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
+ def value(self, unit: PressureUnits) -> float:
+ if unit == self.unit:
+ return self.val
+ si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
+ return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
def l10n(
self,
- unit: Units | None = None,
+ unit: PressureUnits | None = None,
locale: babel.Locale | GeoLocation | None = None,
template: str = "{value} {unit}",
num_pattern: str = "#,##0",
@@ -363,8 +363,10 @@ class Pressure:
return template.format(value=val_str, unit=unit)
-@typing.final
-class WindSpeed:
+WindSpeedUnits: t.TypeAlias = t.Literal["m/s", "km/h", "kn", "mph", "mi/h", "Bft"]
+
+
+class WindSpeed(msgspec.Struct, kw_only=True):
"""Class for converting speed or velocity units and for string
representation of measured values.
@@ -375,28 +377,28 @@ class WindSpeed:
(55.6 m/s)
"""
- si_name = "Q182429"
-
- Units = typing.Literal["m/s", "km/h", "kn", "mph", "mi/h", "Bft"]
- """Supported units."""
+ val: float
+ unit: WindSpeedUnits
- units = list(typing.get_args(Units))
+ si_name: t.ClassVar[str] = "Q182429"
+ units: t.ClassVar[list[str]] = list(t.get_args(WindSpeedUnits))
- def __init__(self, value: float, unit: Units):
- if unit not in self.units:
- raise ValueError(f"invalid unit: {unit}")
- # pylint: disable=invalid-name
- self.si: float = convert_to_si(si_name=self.si_name, symbol=unit, value=value)
+ def __post_init__(self):
+ if self.unit not in self.units:
+ raise ValueError(f"invalid unit: {self.unit}")
def __str__(self):
return self.l10n()
- def value(self, unit: Units) -> float:
- return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
+ def value(self, unit: WindSpeedUnits) -> float:
+ if unit == self.unit:
+ return self.val
+ si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
+ return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
def l10n(
self,
- unit: Units | None = None,
+ unit: WindSpeedUnits | None = None,
locale: babel.Locale | GeoLocation | None = None,
template: str = "{value} {unit}",
num_pattern: str = "#,##0",
@@ -413,23 +415,23 @@ class WindSpeed:
return template.format(value=val_str, unit=unit)
-@typing.final
-class RelativeHumidity:
- """Amount of relative humidity in the air. The unit is ``%``"""
+RelativeHumidityUnits: t.TypeAlias = t.Literal["%"]
+
- Units = typing.Literal["%"]
- """Supported unit."""
+class RelativeHumidity(msgspec.Struct):
+ """Amount of relative humidity in the air. The unit is ``%``"""
- units = list(typing.get_args(Units))
+ val: float
- def __init__(self, humidity: float):
- self.humidity = humidity
+ # there exists only one unit (%) --> set "%" as the final value (constant)
+ unit: t.ClassVar["t.Final[RelativeHumidityUnits]"] = "%"
+ units: t.ClassVar[list[str]] = list(t.get_args(RelativeHumidityUnits))
def __str__(self):
return self.l10n()
def value(self) -> float:
- return self.humidity
+ return self.val
def l10n(
self,
@@ -447,45 +449,50 @@ class RelativeHumidity:
return template.format(value=val_str, unit=unit)
-@typing.final
-class Compass:
- """Class for converting compass points and azimuth values (360°)"""
+CompassPoint: t.TypeAlias = t.Literal[
+ "N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"
+]
+"""Compass point type definition"""
- Units = typing.Literal["°", "Point"]
+CompassUnits: t.TypeAlias = t.Literal["°", "Point"]
- Point = typing.Literal[
- "N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"
- ]
- """Compass point type definition"""
- TURN = 360.0
+class Compass(msgspec.Struct):
+ """Class for converting compass points and azimuth values (360°)"""
+
+ val: "float | int | CompassPoint"
+ unit: CompassUnits = "°"
+
+ TURN: t.ClassVar[float] = 360.0
"""Full turn (360°)"""
- POINTS = list(typing.get_args(Point))
+ POINTS: t.ClassVar[list[CompassPoint]] = list(t.get_args(CompassPoint))
"""Compass points."""
- RANGE = TURN / len(POINTS)
+ RANGE: t.ClassVar[float] = TURN / len(POINTS)
"""Angle sector of a compass point"""
- def __init__(self, azimuth: float | int | Point):
- if isinstance(azimuth, str):
- if azimuth not in self.POINTS:
- raise ValueError(f"Invalid compass point: {azimuth}")
- azimuth = self.POINTS.index(azimuth) * self.RANGE
- self.azimuth = azimuth % self.TURN
+ def __post_init__(self):
+ if isinstance(self.val, str):
+ if self.val not in self.POINTS:
+ raise ValueError(f"Invalid compass point: {self.val}")
+ self.val = self.POINTS.index(self.val) * self.RANGE
+
+ self.val = self.val % self.TURN
+ self.unit = "°"
def __str__(self):
return self.l10n()
- def value(self, unit: Units):
- if unit == "Point":
- return self.point(self.azimuth)
+ def value(self, unit: CompassUnits):
+ if unit == "Point" and isinstance(self.val, float):
+ return self.point(self.val)
if unit == "°":
- return self.azimuth
+ return self.val
raise ValueError(f"unknown unit: {unit}")
@classmethod
- def point(cls, azimuth: float | int) -> Point:
+ def point(cls, azimuth: float | int) -> CompassPoint:
"""Returns the compass point to an azimuth value."""
azimuth = azimuth % cls.TURN
# The angle sector of a compass point starts 1/2 sector range before
@@ -496,7 +503,7 @@ class Compass:
def l10n(
self,
- unit: Units = "Point",
+ unit: CompassUnits = "Point",
locale: babel.Locale | GeoLocation | None = None,
template: str = "{value}{unit}",
num_pattern: str = "#,##0",
@@ -514,7 +521,7 @@ class Compass:
return template.format(value=val_str, unit=unit)
-WeatherConditionType = typing.Literal[
+WeatherConditionType = t.Literal[
# The capitalized string goes into to i18n/l10n (en: "Clear sky" -> de: "wolkenloser Himmel")
"clear sky",
"partly cloudy",
@@ -632,7 +639,7 @@ YR_WEATHER_SYMBOL_MAP = {
if __name__ == "__main__":
# test: fetch all symbols of the type catalog ..
- for c in typing.get_args(WeatherConditionType):
+ for c in t.get_args(WeatherConditionType):
symbol_url(condition=c)
_cache = get_WEATHER_DATA_CACHE()
diff --git a/searx/webutils.py b/searx/webutils.py
index e025c6b47..65fcdc10e 100644
--- a/searx/webutils.py
+++ b/searx/webutils.py
@@ -16,6 +16,7 @@ from typing import Iterable, List, Tuple, TYPE_CHECKING
from io import StringIO
from codecs import getincrementalencoder
+import msgspec
from flask_babel import gettext, format_date # type: ignore
from searx import logger, get_setting
@@ -147,6 +148,8 @@ def write_csv_response(csv: CSVWriter, rc: "ResultContainer") -> None: # pylint
class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
def default(self, o):
+ if isinstance(o, msgspec.Struct):
+ return msgspec.to_builtins(o)
if isinstance(o, datetime):
return o.isoformat()
if isinstance(o, timedelta):