diff options
Diffstat (limited to 'tpblite/models')
-rw-r--r-- | tpblite/models/torrents.py | 135 | ||||
-rw-r--r-- | tpblite/models/utils.py | 73 |
2 files changed, 115 insertions, 93 deletions
diff --git a/tpblite/models/torrents.py b/tpblite/models/torrents.py index 2d8bc6a..ada282f 100644 --- a/tpblite/models/torrents.py +++ b/tpblite/models/torrents.py @@ -2,133 +2,132 @@ import re import unicodedata from bs4 import BeautifulSoup -#TODO: write better comments +# TODO: write better comments + def fileSizeStrToInt(size_str): - '''Converts file size given in *iB format to bytes integer''' - - unit_dict = {'KiB':(2**10), - 'MiB':(2**20), - 'GiB':(2**30), - 'TiB':(2**40)} + """Converts file size given in *iB format to bytes integer""" + + unit_dict = {"KiB": (2 ** 10), "MiB": (2 ** 20), "GiB": (2 ** 30), "TiB": (2 ** 40)} try: num = float(size_str[:-3]) unit = size_str[-3:] return int(num * unit_dict[unit]) except Exception as e: - raise AttributeError('Cannot determine filesize: {0}, error: {1}'.format(size_str,e)) - -class Torrent(object): - ''' + raise AttributeError( + "Cannot determine filesize: {0}, error: {1}".format(size_str, e) + ) + + +class Torrent: + """ Abstract class to contain info about torrent magnet link, file size, number of seeds, number of leeches etc. - ''' + """ + def __init__(self, html_row): self.html_row = html_row self.title = self._getTitle() self.seeds, self.leeches = self._getPeers() - self.upload_date, self.filesize, self.byte_size, self.uploader = self._getFileInfo() + self.upload_date, self.filesize, self.byte_size, self.uploader = ( + self._getFileInfo() + ) self.magnetlink = self._getMagnetLink() - + def __str__(self): - return '{0}, S: {1}, L: {2}, {3}'.format(self.title, - self.seeds, - self.leeches, - self.filesize) - + return "{0}, S: {1}, L: {2}, {3}".format( + self.title, self.seeds, self.leeches, self.filesize + ) + def __repr__(self): - return '<Torrent object: {}>'.format(self.title) + return "<Torrent object: {}>".format(self.title) def _getTitle(self): - return self.html_row.find('a', class_='detLink').string + return self.html_row.find("a", class_="detLink").string def _getMagnetLink(self): - tag = self.html_row.find('a', href=(re.compile('magnet'))) - link = tag.get('href') + tag = self.html_row.find("a", href=(re.compile("magnet"))) + link = tag.get("href") return link - + def _getPeers(self): - taglist = self.html_row.find_all('td', align='right') + taglist = self.html_row.find_all("td", align="right") return int(taglist[0].string), int(taglist[1].string) - + def _getFileInfo(self): - text = self.html_row.find('font', class_='detDesc').get_text() - t = text.split(',') - uptime = unicodedata.normalize('NFKD', t[0].replace('Uploaded ','').strip()) - size = unicodedata.normalize('NFKD', t[1].replace('Size ', '').strip()) + text = self.html_row.find("font", class_="detDesc").get_text() + t = text.split(",") + uptime = unicodedata.normalize("NFKD", t[0].replace("Uploaded ", "").strip()) + size = unicodedata.normalize("NFKD", t[1].replace("Size ", "").strip()) byte_size = fileSizeStrToInt(size) - uploader = unicodedata.normalize('NFKD', t[2].replace('ULed by ', '').strip()) + uploader = unicodedata.normalize("NFKD", t[2].replace("ULed by ", "").strip()) return uptime, size, byte_size, uploader - - -class Torrents(object): - ''' + + +class Torrents: + """ Torrent object, takes query response and parses into torrent list or dict. Has methods to select items from torrent list. - ''' - def __init__(self, search_str, html_source): - self.search_str = search_str - self.__search_set = None - + """ + + def __init__(self, html_source): self.html_source = html_source self.list = self._createTorrentList() - + def __str__(self): - return 'Torrents object: {} torrents'.format(len(self.list)) - + return "Torrents object: {} torrents".format(len(self.list)) + def __repr__(self): - return '<Torrents object: {} torrents>'.format(len(self.list)) - + return "<Torrents object: {} torrents>".format(len(self.list)) + def __iter__(self): return iter(self.list) def __len__(self): return len(self.list) - def __getitem__(self,index): + def __getitem__(self, index): return self.list[index] - @property - def _search_set(self): - if self.__search_set is None: - self.__search_set = set(filter(None, re.split(r'[\s.|\(|\)]',self.search_str.lower()))) - return self.__search_set - def _createTorrentList(self): - soup = BeautifulSoup(self.html_source, features='html.parser') + soup = BeautifulSoup(self.html_source, features="html.parser") if soup.body is None: - raise ConnectionError('Could not determine torrents (empty html body)') - rows = soup.body.find_all('tr') + raise ConnectionError("Could not determine torrents (empty html body)") + rows = soup.body.find_all("tr") torrents = [] for row in rows: - # Get the lowercase unique set from the row text - text_set = set(filter(None, re.split(r'[\s.|\(|\)]',row.text.lower()))) - # Check if search string is subset - if self._search_set.issubset(text_set): + if len(row.find_all("td", {"class": "vertTh"})) == 1: torrents.append(Torrent(row)) return torrents - - def getBestTorrent(self, min_seeds=30, min_filesize='1 GiB', max_filesize='4 GiB'): - '''Filters torrent list based on some constraints, then returns highest seeded torrent + + def getBestTorrent(self, min_seeds=30, min_filesize="1 GiB", max_filesize="4 GiB"): + """Filters torrent list based on some constraints, then returns highest seeded torrent :param min_seeds (int): minimum seed number filter :param min_filesize (str): minimum filesize in XiB form, eg. GiB :param max_filesize (str): maximum filesize in XiB form, eg. GiB - :return Torrent Object: Torrent with highest seed number, will return None if all are filtered out''' + :return Torrent Object: Torrent with highest seed number, will return None if all are filtered out""" if not isinstance(min_filesize, int): min_filesize = fileSizeStrToInt(min_filesize) if not isinstance(max_filesize, int): max_filesize = fileSizeStrToInt(max_filesize) - filtered_list = filter(lambda x: self._filterTorrent(x, min_seeds, min_filesize, max_filesize), self.list) + filtered_list = filter( + lambda x: self._filterTorrent(x, min_seeds, min_filesize, max_filesize), + self.list, + ) sorted_list = sorted(filtered_list, key=lambda x: x.seeds, reverse=True) if len(sorted_list) > 0: return sorted_list[0] else: - print('No torrents found given criteria') + print("No torrents found given criteria") return None - + def _filterTorrent(self, torrent, min_seeds, min_filesize, max_filesize): - if (torrent.seeds < min_seeds) or (torrent.byte_size < min_filesize) or (torrent.byte_size > max_filesize): + if ( + (torrent.seeds < min_seeds) + or (torrent.byte_size < min_filesize) + or (torrent.byte_size > max_filesize) + ): return False else: - return True
\ No newline at end of file + return True diff --git a/tpblite/models/utils.py b/tpblite/models/utils.py index 1d6b351..eb24f8c 100644 --- a/tpblite/models/utils.py +++ b/tpblite/models/utils.py @@ -1,40 +1,63 @@ +from typing import Tuple, Type import random from urllib.request import Request, urlopen import urllib.error from purl import URL as pURL -class QueryParser(object): - ''' - Query object capable of getting html response given - a search query and other parameters. - ''' - def __init__(self, query, base_url, page, order, category): +class QueryParser: + """Query object capable of getting html response given a search query and other + parameters. + """ + + # PirateBay URL to use for queries + base_url: str + + # Compiled search string used to query the PirateBay URL + url: str + + def __init__(self, base_url: str, segments: Tuple[str, ...]): self.base_url = base_url - segments = ('search', query, str(page), str(order), str(category)) self.url = URL(base_url, segments) try: self.html_source = self._sendRequest() except urllib.error.URLError: - raise ConnectionError('Could not establish connection wtih {}'.format(self.base_url)) - + raise ConnectionError( + "Could not establish connection wtih {}".format(self.base_url) + ) + + @classmethod + def from_search( + cls, query: str, base_url: str, page: int, order: int, category: int + ): + segments = ("search", query, str(page), str(order), str(category)) + return cls(base_url, segments) + + @classmethod + def from_browse(cls, base_url: str, category: int, page: int, order: int): + print("browsing") + segments = ("browse", str(category), str(page), str(order), "0") + + return cls(base_url, segments) + def _sendRequest(self): req = Request(self.url, headers=headers()) return urlopen(req).read() -def URL(base, segments): + +def URL(base: str, segments: Tuple[str, ...]) -> str: u = pURL().from_string(base) url = u.path_segments(segments) return url.as_string() def headers(): - ''' + """ The Pirate Bay blocks requests (403 Forbidden) basing on User-Agent header, so it's probably better to rotate them. User-Agents taken from: https://techblog.willshouse.com/2012/01/03/most-common-user-agents/ - ''' + """ return { "User-Agent": random.choice(USER_AGENTS), "origin_req_host": "thepiratebay.se", @@ -42,16 +65,16 @@ def headers(): USER_AGENTS = ( - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/60.0.3112.113 Safari/537.36', - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/60.0.3112.101 Safari/537.36', - 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/60.0.3112.113 Safari/537.36', - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/60.0.3112.113 Safari/537.36', -)
\ No newline at end of file + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/60.0.3112.113 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/60.0.3112.101 Safari/537.36", + "Mozilla/5.0 (Windows NT 6.1; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/60.0.3112.113 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/60.0.3112.113 Safari/537.36", +) |