nixpkgs/pkgs/desktops/gnome/extensions/update-extensions.py

444 lines
15 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -I nixpkgs=../../../.. -i python3 -p python3
### After making change:
### - Format the script by running: nix run nixpkgs#black pkgs/desktops/gnome/extensions/update-extensions.py
### - Run the unit test by running: python3 -m unittest pkgs/desktops/gnome/extensions/update-extensions.py
### - Run the type checking by running: nix run nixpkgs#mypy pkgs/desktops/gnome/extensions/update-extensions.py
import base64
import json
import logging
import argparse
import unittest
import subprocess
import urllib.error
import urllib.request
from contextlib import contextmanager
from operator import itemgetter
from pathlib import Path
from typing import Any
# We don't want all those deprecated legacy extensions
# Group extensions by GNOME "major" version for compatibility reasons
supported_versions = {
"38": "3.38",
"40": "40",
"41": "41",
"42": "42",
"43": "43",
"44": "44",
"45": "45",
"46": "46",
"47": "47",
"48": "48",
}
# shell versions that we want to put into the gnomeExtensions attr set
versions_to_merge = ["46", "47", "48"]
# Some type alias to increase readability of complex compound types
PackageName = str
ShellVersion = str
Uuid = str
ExtensionVersion = int
updater_dir_path = Path(__file__).resolve().parent
def fetch_extension_data(uuid: str, version: str) -> tuple[str, str]:
"""
Download the extension and hash it. We use `nix-prefetch-url` for this for efficiency reasons.
Returns a tuple with the hash (Nix-compatible) of the zip file's content and the base64-encoded content of its metadata.json.
"""
# The download URLs follow this schema
uuid = uuid.replace("@", "")
url: str = (
f"https://extensions.gnome.org/extension-data/{uuid}.v{version}.shell-extension.zip"
)
# Download extension and add the zip content to nix-store
for _ in range(0, 10):
process = subprocess.run(
["nix-prefetch-url", "--unpack", "--print-path", url],
capture_output=True,
text=True,
)
if process.returncode == 0:
break
else:
logging.warning(f"Nix-prefetch-url failed for {url}:")
logging.warning(f"Stderr: {process.stderr}")
logging.warning("Retrying")
if process.returncode != 0:
raise Exception(
"Retried 10 times, but still failed to download the extension. Exiting."
)
lines = process.stdout.splitlines()
# Get hash from first line of nix-prefetch-url output
hash = lines[0].strip()
# Get path from second line of nix-prefetch-url output
path = Path(lines[1].strip())
# Get metadata.json content from nix-store
with open(path / "metadata.json", "r") as out:
metadata = base64.b64encode(out.read().encode("ascii")).decode()
return hash, metadata
def generate_extension_versions(
extension_version_map: dict[ShellVersion, ExtensionVersion],
uuid: str,
) -> dict[ShellVersion, dict[str, str]]:
"""
Takes in a mapping from shell versions to extension versions and transforms it the way we need it:
- Only take one extension version per GNOME Shell major version (as per `supported_versions`)
- Filter out versions that only support old GNOME versions
- Download the extension and hash it
"""
# Determine extension version per shell version
extension_versions: dict[ShellVersion, ExtensionVersion] = {}
for shell_version, version_prefix in supported_versions.items():
# Newest compatible extension version
extension_version: int | None = max(
(
int(ext_ver)
for shell_ver, ext_ver in extension_version_map.items()
if (shell_ver.startswith(version_prefix))
),
default=None,
)
# Extension is not compatible with this GNOME version
if not extension_version:
continue
extension_versions[shell_version] = extension_version
# Download information once for all extension versions chosen above
extension_info_cache: dict[ExtensionVersion, tuple[str, str]] = {}
for extension_version in sorted(set(extension_versions.values())):
logging.debug(f"[{uuid}] Downloading v{extension_version}")
extension_info_cache[extension_version] = fetch_extension_data(
uuid,
str(extension_version),
)
# Fill map
extension_versions_full: dict[ShellVersion, dict[str, str]] = {}
for shell_version, extension_version in extension_versions.items():
sha256, metadata = extension_info_cache[extension_version]
extension_versions_full[shell_version] = {
"version": str(extension_version),
"sha256": sha256,
# The downloads are impure, their metadata.json may change at any time.
# Thus, we back it up / pin it to remain deterministic
# Upstream issue: https://gitlab.gnome.org/Infrastructure/extensions-web/-/issues/137
"metadata": metadata,
}
return extension_versions_full
def pname_from_url(url: str) -> tuple[str, str]:
"""
Parse something like "/extension/1475/battery-time/" and output ("battery-time", "1475")
"""
url = url.split("/") # type: ignore
return url[3], url[2]
def process_extension(extension: dict[str, Any]) -> dict[str, Any] | None:
"""
Process an extension. It takes in raw scraped data and downloads all the necessary information that buildGnomeExtension.nix requires
Input: a json object of one extension queried from the site. It has the following schema (only important key listed):
{
"uuid": str,
"name": str,
"description": str,
"link": str,
"shell_version_map": {
str: { "version": int, … },
},
}
"uuid" is an extension UUID that looks like this (most of the time): "extension-name@username.domain.tld".
Don't make any assumptions on it, and treat it like an opaque string!
"link" follows the following schema: "/extension/$number/$string/"
The number is monotonically increasing and unique to every extension.
The string is usually derived from the extension name (but shortened, kebab-cased and URL friendly).
It may diverge from the actual name.
The keys of "shell_version_map" are GNOME Shell version numbers.
Output: a json object to be stored, or None if the extension should be skipped. Schema:
{
"uuid": str,
"name": str,
"pname": str,
"description": str,
"link": str,
"shell_version_map": {
str: { "version": int, "sha256": str, "metadata": <hex> },
}
}
Only "uuid" gets passed along unmodified. "name", "description" and "link" are taken from the input, but sanitized.
"pname" gets generated from other fields and "shell_version_map" has a completely different structure than the input
field with the same name.
"""
uuid = extension["uuid"]
# Yeah, there are some extensions without any releases
if not extension["shell_version_map"]:
return None
logging.info(f"Processing '{uuid}'")
# Input is a mapping str -> { version: int, … }
# We want to map shell versions to extension versions
shell_version_map: dict[ShellVersion, int] = {
k: v["version"] for k, v in extension["shell_version_map"].items()
}
# Transform shell_version_map to be more useful for us. Also throw away unwanted versions
shell_version_map: dict[ShellVersion, dict[str, str]] = generate_extension_versions(shell_version_map, uuid) # type: ignore
# No compatible versions found
if not shell_version_map:
return None
# Fetch a human-readable name for the package.
(pname, _pname_id) = pname_from_url(extension["link"])
return {
"uuid": uuid,
"name": extension["name"],
"pname": pname,
"description": extension["description"],
"link": "https://extensions.gnome.org" + extension["link"],
"shell_version_map": shell_version_map,
}
@contextmanager
def request(url: str, retries: int = 5, retry_codes: list[int] = [500, 502, 503, 504]):
req = urllib.request.Request(
url,
headers={
"User-Agent": "NixpkgsGnomeExtensionUpdate (+https://github.com/NixOS/nixpkgs)"
},
)
for attempt in range(retries + 1):
try:
with urllib.request.urlopen(req) as response:
yield response
break
except urllib.error.HTTPError as e:
if e.code in retry_codes and attempt < retries:
logging.warning(f"Error while fetching {url}. Retrying: {e}")
else:
raise e
def scrape_extensions_index() -> list[dict[str, Any]]:
"""
Scrape the list of extensions by sending search queries to the API. We simply go over it
page by page until we hit a non-full page or a 404 error.
The returned list is sorted by the age of the extension, in order to be deterministic.
"""
page = 0
extensions = []
while True:
page += 1
logging.info("Scraping page " + str(page))
try:
with request(
f"https://extensions.gnome.org/extension-query/?n_per_page=25&page={page}"
) as response:
data = json.loads(response.read().decode())["extensions"]
response_length = len(data)
for extension in data:
extensions.append(extension)
# If our page isn't "full", it must have been the last one
if response_length < 25:
logging.debug(
f"\tThis page only has {response_length} entries, so it must be the last one."
)
break
except urllib.error.HTTPError as e:
if e.code == 404:
# We reached past the last page and are done now
break
else:
raise
# `pk` is the primary key in the extensions.gnome.org database. Sorting on it will give us a stable,
# deterministic ordering.
extensions.sort(key=itemgetter("pk"))
return extensions
def fetch_extensions() -> list[dict[str, Any]]:
raw_extensions = scrape_extensions_index()
logging.info(f"Downloaded {len(raw_extensions)} extensions. Processing …")
processed_extensions: list[dict[str, Any]] = []
for num, raw_extension in enumerate(raw_extensions):
processed_extension = process_extension(raw_extension)
if processed_extension:
processed_extensions.append(processed_extension)
logging.debug(f"Processed {num + 1} / {len(raw_extensions)}")
return processed_extensions
def serialize_extensions(processed_extensions: list[dict[str, Any]]) -> None:
# We micro-manage a lot of the serialization process to keep the diffs optimal.
# We generally want most of the attributes of an extension on one line,
# but then each of its supported versions with metadata on a new line.
with open(updater_dir_path / "extensions.json", "w") as out:
for index, extension in enumerate(processed_extensions):
# Manually pretty-print the outermost array level
if index == 0:
out.write("[ ")
else:
out.write(", ")
# Dump each extension into a single-line string forst
serialized_extension = json.dumps(extension, ensure_ascii=False)
# Inject line breaks for each supported version
for version in supported_versions:
# This one only matches the first entry
serialized_extension = serialized_extension.replace(
f'{{"{version}": {{',
f'{{\n "{version}": {{',
)
# All other entries
serialized_extension = serialized_extension.replace(
f', "{version}": {{',
f',\n "{version}": {{',
)
# One last line break around the closing braces
serialized_extension = serialized_extension.replace("}}}", "}\n }}")
out.write(serialized_extension)
out.write("\n")
out.write("]\n")
def find_collisions(
extensions: list[dict[str, Any]],
versions: list[str],
) -> dict[PackageName, list[Uuid]]:
package_name_registry: dict[PackageName, set[Uuid]] = {}
for extension in extensions:
pname = extension["pname"]
uuid = extension["uuid"]
for shell_version in versions:
if shell_version in extension["shell_version_map"]:
package_name_registry.setdefault(pname, set()).add(uuid)
return {
pname: sorted(uuids)
for pname, uuids in sorted(package_name_registry.items())
if len(uuids) > 1
}
def main() -> None:
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument(
"--skip-fetch",
action="store_true",
help="Skip fetching extensions. When this option is set, the script does not fetch extensions from the internet, but checks for name collisions.",
)
args = parser.parse_args()
if not args.skip_fetch:
processed_extensions = fetch_extensions()
serialize_extensions(processed_extensions)
logging.info(
f"Done. Writing results to extensions.json ({len(processed_extensions)} extensions in total)"
)
with open(updater_dir_path / "extensions.json", "r") as out:
extensions = json.load(out)
collisions = find_collisions(extensions, versions_to_merge)
with open(updater_dir_path / "collisions.json", "w") as out:
json.dump(collisions, out, indent=2, ensure_ascii=False)
out.write("\n")
logging.info(
"Done. Writing name collisions to collisions.json (please check manually)"
)
if __name__ == "__main__":
main()
class FindCollisions(unittest.TestCase):
extensions = [
{
"pname": "foo",
"uuid": "foo_38_to_40@doe.example",
"shell_version_map": {
"38": {},
"40": {},
},
},
{
"pname": "bar",
"uuid": "bar_42_to_45@chulsoo.example",
"shell_version_map": {
"42": {},
"43": {},
"44": {},
"45": {},
},
},
{
"pname": "bar",
"uuid": "bar_44_to_47@younghee.example",
"shell_version_map": {
"44": {},
"45": {},
"46": {},
"47": {},
},
},
]
def test_no_collision(self) -> None:
self.assertEqual(
{},
find_collisions(self.extensions, ["40", "41", "42"]),
)
def test_collision(self) -> None:
self.assertEqual(
{
"bar": [
"bar_42_to_45@chulsoo.example",
"bar_44_to_47@younghee.example",
],
},
find_collisions(self.extensions, ["45", "46", "47"]),
)