Source code for unidown.plugin.a_plugin

import json
import logging
import time
from abc import ABC, abstractmethod
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any, Optional

import certifi
import pkg_resources
import urllib3
from packaging.version import Version
from tqdm import tqdm
from urllib3.exceptions import HTTPError

from unidown import tools
from unidown.core.settings import Settings
from unidown.plugin.exceptions import PluginError
from unidown.plugin.link_item_dict import LinkItemDict
from unidown.plugin.plugin_info import PLUGIN_INFO_EMPTY, PluginInfo
from unidown.plugin.savestate import SaveState


[docs]class APlugin(ABC): # noqa: PLR0904 """ Abstract class of a plugin. Provides all needed variables and methods. :param options: Parameters which can include optional parameters. :raises ~unidown.plugin.exceptions.PluginError: Can not create default plugin paths. """ #: Meta information about the plugin. _INFO: PluginInfo = PLUGIN_INFO_EMPTY #: Savestate class to use. _SAVESTATE_CLS: type[SaveState] = SaveState def __init__(self, settings: Settings, options: Optional[dict[str, Any]] = None) -> None: if options is None: options = {} if self._INFO == PLUGIN_INFO_EMPTY: raise ValueError("info is not set.") #: If the tqdm progressbar should be disabled. self._disable_tqdm: bool = settings.disable_tqdm #: Use this for logging. self._log: logging.Logger = logging.getLogger(self._INFO.name) #: Number of simultaneous downloads. self._simul_downloads: int = settings.cores #: Path where the plugin can place all temporary data. self._temp_dir: Path = settings.temp_dir.joinpath(self.name) #: General download path of the plugin. self._download_dir: Path = settings.download_dir.joinpath(self.name) #: File which contains the latest savestate of the plugin. self._savestate_file: Path = settings.savestate_dir.joinpath(f"{self.name}_save.json") try: self._temp_dir.mkdir(parents=True, exist_ok=True) self._download_dir.mkdir(parents=True, exist_ok=True) self._savestate_file.parent.mkdir(parents=True, exist_ok=True) except PermissionError: raise PluginError('Can not create default plugin paths, due to a permission error.') # noqa: PLW0707 # cached data #: Latest update time of the referencing data. self._last_update: datetime = datetime(1970, 1, 1) # noqa: WPS432 #: Referencing data. self._download_data: LinkItemDict = LinkItemDict() #: Savestate of the plugin. self._savestate: SaveState = self._SAVESTATE_CLS(self.info, self.last_update, LinkItemDict()) #: The unit which will be displayed in the progress bar. self._unit: str = 'item' #: Downloader which will download the data. self._downloader: urllib3.HTTPSConnectionPool = urllib3.HTTPSConnectionPool( self.info.host, maxsize=self._simul_downloads, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where() ) # load options #: Options which the plugin uses internally, should be used for the given options at initialization. self._options: dict[str, Any] = options self._load_default_options() def __eq__(self, other: object) -> bool: """ Two plugins are equal when having the same meta information. """ if not isinstance(other, self.__class__): return False return self.info == other.info def __ne__(self, other: object) -> bool: # noqa: D105 return not self.__eq__(other) def __hash__(self) -> int: # noqa: D105 return hash(self.info) @property def log(self) -> logging.Logger: """ Plain getter. """ return self._log @property def simul_downloads(self) -> int: """ Plain getter. """ return self._simul_downloads @property def info(self) -> PluginInfo: """ Plain getter. """ return self._INFO @property def host(self) -> str: """ Plain getter. """ return self._INFO.host @property def name(self) -> str: """ Plain getter. """ return self._INFO.name @property def version(self) -> Version: """ Plain getter. """ return self._INFO.version @property def temp_dir(self) -> Path: """ Plain getter. """ return self._temp_dir @property def download_dir(self) -> Path: """ Plain getter. """ return self._download_dir @property def savestate(self) -> SaveState: """ Plain getter. """ return self._savestate @property def last_update(self) -> datetime: """ Plain getter. """ return self._last_update @property def download_data(self) -> LinkItemDict: """ Plain getter. """ return self._download_data @property def unit(self) -> str: """ Plain getter. """ return self._unit @property def options(self) -> dict[str, Any]: """ Plain getter. """ return self._options
[docs] def load_savestate(self) -> None: """ Load the save of the plugin. :raises ~unidown.plugin.exceptions.PluginError: Broken savestate json. :raises ~unidown.plugin.exceptions.PluginError: Different savestate versions. :raises ~unidown.plugin.exceptions.PluginError: Different plugin versions. :raises ~unidown.plugin.exceptions.PluginError: Different plugin names. :raises ~unidown.plugin.exceptions.PluginError: Could not parse the json. """ if not self._savestate_file.exists(): self.log.info("No savestate file found.") return with self._savestate_file.open(encoding="utf8") as reader: try: savestate_json = json.loads(reader.read()) except Exception: raise PluginError( # noqa: PLW0707 f"Broken savestate json. Please fix or delete this file (you may lose data in this case): {self._savestate_file}" ) try: savestate = self._SAVESTATE_CLS.from_json(savestate_json) except Exception as ex: raise PluginError(f"Could not load savestate from json {self._savestate_file}: {ex}") # noqa: PLW0707 else: del savestate_json # noqa: WPS420 savestate = self._SAVESTATE_CLS.upgrade(savestate) if savestate.plugin_info.name != self.info.name: raise PluginError(f"Save state plugin ({savestate.plugin_info.name}) does not match the current ({self.name}).") self._savestate = savestate
[docs] def update_last_update(self) -> None: """ Call this to update the latest update time. Calls :func:`~unidown.plugin.a_plugin.APlugin._create_last_update_time`. """ self._last_update = self._create_last_update_time()
[docs] def update_download_data(self) -> None: """ Update the download links. Calls :func:`~unidown.plugin.a_plugin.APlugin._create_download_data`. """ self._download_data = self._create_download_data()
[docs] def download(self, link_items: LinkItemDict, folder: Path, desc: str, unit: str) -> None: """ Download the given LinkItem dict from the plugins host, to the given path. Proceeded with multiple connections. :attr:`~unidown.plugin.a_plugin.APlugin._simul_downloads`. After :func:`~unidown.plugin.a_plugin.APlugin.check_download` is recommended. This function don't use an internal `link_item_dict`, `delay` or `folder` directly set in options or instance vars, because it can be used aside of the normal download routine inside the plugin itself for own things. As of this it still needs access to the logger, so a staticmethod is not possible. .. warning:: The parameters may change in future versions. (e.g. change order and accept another host) :param link_items: Data which gets downloaded. :param folder: Target download folder. :param desc: Description of the progressbar. :param unit: Unit of the download, shown in the progressbar. """ if not link_items: return job_list: list[Future] = [] with ThreadPoolExecutor(max_workers=self._simul_downloads) as executor: for link, item in link_items.items(): job: Future = executor.submit(self.download_as_file, link, folder.joinpath(item.name), self._options['delay']) job_list.append(job) pbar = tqdm(as_completed(job_list), total=len(job_list), desc=desc, unit=unit, mininterval=1, ncols=100, disable=self._disable_tqdm) for _ in pbar: # noqa: WPS328 pass # noqa: WPS420 for job in job_list: # noqa: WPS440 try: job.result() except HTTPError as ex: self.log.warning("Failed to download: %s", str(ex))
[docs] def download_as_file(self, url: str, target_file: Path, delay: float = 0) -> str: """ Download the given url to the given target folder. :param url: Link. :param target_file: Target file. :param delay: Delay after each download. Delay is in seconds. :return: Url. :raises ~urllib3.exceptions.HTTPError: Connection had an error. """ if target_file.exists(): new_name = target_file while new_name.exists(): new_name = new_name.with_name(f"{new_name.stem}_r{''.join(new_name.suffixes)}") target_file.rename(new_name) self.log.critical("target file exists! renaming '%s' to '%s'", target_file, new_name) with self._downloader.request('GET', url, preload_content=False, retries=urllib3.util.retry.Retry(3)) as reader: if reader.status == 200: # noqa: WPS432 with target_file.open(mode='wb') as writer: writer.write(reader.data) else: raise HTTPError(f"{url} | {reader.status}") if delay > 0: time.sleep(delay) return url
[docs] def check_download(self, link_item_dict: LinkItemDict, folder: Path, log: bool = False) -> tuple[LinkItemDict, LinkItemDict]: """ Check if the download of the given dict was successful. No proving if the content of the file is correct too. :param link_item_dict: Items to check. :param folder: Folder where the downloads are saved. :param log: Log lost items. :return: Succeed. """ succeed = LinkItemDict({link: item for link, item in link_item_dict.items() if folder.joinpath(item.name).is_file()}) failed = LinkItemDict({link: item for link, item in link_item_dict.items() if link not in succeed}) if failed and log: for link, item in failed.items(): self.log.warning("Not downloaded: %s%s - %s", self.info.host, link, item.name) return succeed, failed
[docs] def update_savestate(self, new_items: LinkItemDict) -> None: """ Update savestate. :param new_items: New items. """ self._savestate.plugin_info = self.info self._savestate.last_update = self.last_update self._savestate.link_items.actualize(new_items)
[docs] def save_savestate(self) -> None: """ Save meta data about the downloaded things and the plugin to a file. """ with self._savestate_file.open(mode='w', encoding="utf8") as writer: writer.write(json.dumps(self._savestate.to_json()))
[docs] def clean_up(self) -> None: """ Clean up for a module. Is deleting :attr:`~unidown.plugin.a_plugin.APlugin._temp_dir`. """ self._downloader.close() tools.unlink_dir_rec(self._temp_dir)
[docs] @abstractmethod def _create_last_update_time(self) -> datetime: """ Get the newest update time from the referencing data. .. note:: Has to be implemented inside Plugin. :raises NotImplementedError: Abstract method. """ raise NotImplementedError
[docs] @abstractmethod def _create_download_data(self) -> LinkItemDict: """ Get the download links in a specific format. .. note:: Has to be implemented inside Plugins. :raise NotImplementedError: Abstract method. """ raise NotImplementedError
[docs] def _load_default_options(self) -> None: """ Load default options if they were not passed at creation. """ delay: Any = self._options.get('delay') if delay is None: self._options['delay'] = 0 self.log.warning("Plugin option 'delay' is missing. Using no delay.") elif not isinstance(delay, float): try: self._options['delay'] = float(delay) except ValueError: self._options['delay'] = 0 self.log.warning("Plugin option 'delay' was not a float. Using no delay.")
[docs]def get_plugins() -> dict[str, pkg_resources.EntryPoint]: """ Get all available plugins for unidown. :return: Plugin name list. """ return {entry.name: entry for entry in pkg_resources.iter_entry_points('unidown.plugin')}