Source code for psnawp_api.utils.misc

"""Miscellaneous functions that are used throughout the psnawp module."""

from __future__ import annotations

import base64
import binascii
import json
import os
import re
from datetime import datetime
from pathlib import Path
from tempfile import gettempdir
from typing import TYPE_CHECKING, cast

from pycountry import countries

from psnawp_api.core.psnawp_exceptions import PSNAWPInvalidTokenError

if TYPE_CHECKING:
    from pycountry.db import Country

ISO_3166_1_ALPHA_2_LEN = 2


[docs] def iso_format_to_datetime(iso_format: str | None) -> datetime | None: """Converts an ISO 8601 formatted string to a :py:class:`~datetime.datetime` object. :param iso_format: The ISO 8601 formatted string (e.g., "2025-02-26T12:00:00Z"). :returns: The corresponding :py:class:`~datetime.datetime` object, or ``None`` if input is ``None``. """ return datetime.fromisoformat(iso_format.replace("Z", "+00:00")) if iso_format is not None else None
[docs] def extract_region_from_npid(npid: str) -> Country | None: """Extracts the region code from a base64-encoded NPID string and converts it to a full country. The function decodes the NPID, extracts the last two characters representing the ISO 3166-1 alpha-2 region code, and uses the `pycountry` library to map it to a full country. :param npid: The base64-encoded NPID string, which is decoded to extract the region code. :returns: The region as a :py:class:`~pycountry.db.Country` object, or ``None`` if extraction fails or region is invalid. This function assumes a valid NPID format (e.g., `"VaultTec-Co@b7.us"`). It splits the string to get the region code (e.g., "US"), which is then matched to the full country using the `pycountry` library. """ try: decoded_npid = base64.b64decode(npid).decode("utf-8") except (binascii.Error, UnicodeDecodeError): return None # Assuming a valid decoded npid format (e.g. VaultTec-Co@b7.us), extract the region (e.g. "US") if "@" in decoded_npid and "." in decoded_npid: region_candidate = decoded_npid.split(".")[-1] if len(region_candidate) == ISO_3166_1_ALPHA_2_LEN and region_candidate.isalpha(): return cast("Country", countries.get(alpha_2=region_candidate)) return None
[docs] def parse_npsso_token(npsso_input: str) -> str: """Accept string from the user that may contain either a valid npsso token or a json string with key "npsso" and value of the npsso token. This function either succeeds at extracting the npsso token from the provided input (meaning a valid npsso json string was provided) or it returns the original input. :param npsso_input: User provided input for npsso token. :returns: Extracted npsso token from user input or the original string. :raises PSNAWPInvalidTokenError: If malformed npsso JSON is supplied """ pattern = r"\{|\}" if re.search(pattern, npsso_input): try: npsso_dict: dict[str, str] = json.loads(npsso_input) return npsso_dict["npsso"] except json.JSONDecodeError as exp: raise PSNAWPInvalidTokenError("Malformed JSON passed as input.") from exp except KeyError as exp: raise PSNAWPInvalidTokenError('Input JSON is missing the "npsso" key') from exp return npsso_input
[docs] def get_temp_db_path(filename: str = "psnawp_limiter.sqlite") -> Path: """Create a writable temporary directory and database file. :param filename: Name of the SQLite database file. :returns: Path to the writable SQLite database. """ temp_dir = Path(gettempdir()) / f"psnawp-{os.getpid()}" temp_dir.mkdir(parents=True, exist_ok=True) db_path = temp_dir / filename db_path.touch(exist_ok=True) return db_path