From ae50a36d2c8e9146b2badc7084d904fbc166a0c0 Mon Sep 17 00:00:00 2001 From: magmadude <8989953+magmadude20@users.noreply.github.com> Date: Fri, 8 Nov 2024 22:13:25 -0600 Subject: [PATCH 1/2] Add SearchResult class that encapsulates logic for title matching and fetching info hash. --- comet/api/stream.py | 359 ++++++++++++++++++++++------------------ comet/search/results.py | 71 ++++++++ comet/utils/general.py | 130 +++------------ 3 files changed, 296 insertions(+), 264 deletions(-) create mode 100644 comet/search/results.py diff --git a/comet/api/stream.py b/comet/api/stream.py index 8bbcb2d..62052b2 100644 --- a/comet/api/stream.py +++ b/comet/api/stream.py @@ -23,8 +23,6 @@ get_zilean, get_torrentio, get_mediafusion, - filter, - get_torrent_hash, translate, get_balanced_hashes, format_title, @@ -38,6 +36,20 @@ streams = APIRouter() +def error_result(error: str): + return { + "name": "[⚠️] Comet", + "description": error, + "url": "https://comet.fast", + } + + +def stream_lookup_error(error: str): + return { + "streams": [error_result(error)] + } + + @streams.get("/stream/{type}/{id}.json") async def stream_noconfig(request: Request, type: str, id: str): return { @@ -61,15 +73,7 @@ async def stream( ): config = config_check(b64config) if not config: - return { - "streams": [ - { - "name": "[⚠️] Comet", - "description": "Invalid Comet config.", - "url": "https://comet.fast", - } - ] - } + return stream_lookup_error("Invalid Comet config.") connector = aiohttp.TCPConnector(limit=0) async with aiohttp.ClientSession( @@ -120,21 +124,99 @@ async def stream( except Exception as e: logger.warning(f"Exception while getting metadata for {id}: {e}") - return { - "streams": [ - { - "name": "[⚠️] Comet", - "description": f"Can't get metadata for {id}", - "url": "https://comet.fast", - } - ] - } + return stream_lookup_error(f"Can't get metadata for {id}") name = translate(name) log_name = name if type == "series": log_name = f"{name} S{season:02d}E{episode:02d}" + cache_key = hashlib.md5( + json.dumps( + { + "debridService": config["debridService"], + "name": name, + "season": season, + "episode": episode, + "indexers": config["indexers"], + } + ).encode("utf-8") + ).hexdigest() + cached = await database.fetch_one( + f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')" + ) + if cached[0] != 0: + logger.info(f"Cache found for {log_name}") + + timestamp = await database.fetch_one( + f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'" + ) + if timestamp[0] + settings.CACHE_TTL < time.time(): + await database.execute( + f"DELETE FROM cache WHERE cacheKey = '{cache_key}'" + ) + + logger.info(f"Cache expired for {log_name}") + else: + sorted_ranked_files = await database.fetch_one( + f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'" + ) + sorted_ranked_files = json.loads(sorted_ranked_files[0]) + + debrid_extension = get_debrid_extension(config["debridService"]) + + balanced_hashes = get_balanced_hashes(sorted_ranked_files, config) + + results = [] + if ( + config["debridStreamProxyPassword"] != "" + and settings.PROXY_DEBRID_STREAM + and settings.PROXY_DEBRID_STREAM_PASSWORD + != config["debridStreamProxyPassword"] + ): + results.append( + { + "name": "[⚠️] Comet", + "title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.", + "url": "https://comet.fast", + } + ) + + for ( + hash, + hash_data, + ) in sorted_ranked_files.items(): + for resolution, hash_list in balanced_hashes.items(): + if hash in hash_list: + data = hash_data["data"] + results.append( + { + "name": f"[{debrid_extension}⚡] Comet {data['resolution']}", + "title": format_title(data, config), + "torrentTitle": ( + data["torrent_title"] + if "torrent_title" in data + else None + ), + "torrentSize": ( + data["torrent_size"] + if "torrent_size" in data + else None + ), + "url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}", + "behaviorHints": { + "filename": data["raw_title"], + "bingeGroup": "comet|" + hash, + }, + } + ) + + continue + + return {"streams": results} + else: + logger.info(f"No cache found for {log_name} with user configuration") + if ( settings.PROXY_DEBRID_STREAM and settings.PROXY_DEBRID_STREAM_PASSWORD @@ -270,27 +352,25 @@ async def stream( debrid = getDebrid(session, config, get_client_ip(request)) + # TODO: cache whether the account has premium. Save ~400 ms for RD. check_premium = await debrid.check_premium() if not check_premium: additional_info = "" if config["debridService"] == "alldebrid": additional_info = "\nCheck your email!" - return { - "streams": [ - { - "name": "[⚠️] Comet", - "description": f"Invalid {config['debridService']} account.{additional_info}", - "url": "https://comet.fast", - } - ] - } + return stream_lookup_error(f"Invalid {config['debridService']} account.{additional_info}") indexer_manager_type = settings.INDEXER_MANAGER_TYPE search_indexer = len(config["indexers"]) != 0 - torrents = [] tasks = [] + # get_aliases is always the first task. + tasks.append( + get_aliases( + session, "movies" if type == "movie" else "shows", id + )) + if indexer_manager_type and search_indexer: logger.info( f"Start of {indexer_manager_type} search for {log_name} with indexers {config['indexers']}" @@ -312,7 +392,7 @@ async def stream( ) else: logger.info( - f"No indexer {'manager ' if not indexer_manager_type else ''}{'selected by user' if indexer_manager_type else 'defined'} for {log_name}" + f"No indexer {'selected by user' if indexer_manager_type else 'manager defined'} for {log_name}" ) if settings.ZILEAN_URL: @@ -324,13 +404,25 @@ async def stream( if settings.SCRAPE_MEDIAFUSION: tasks.append(get_mediafusion(log_name, type, full_id)) - search_response = await asyncio.gather(*tasks) - for results in search_response: - for result in results: - torrents.append(result) + search_responses = await asyncio.gather(*tasks) + + # get_aliases is always the first task. + aliases = search_responses[0] + + remove_adult_content = ( + settings.REMOVE_ADULT_CONTENT and config["removeTrash"] + ) + + all_results = [] + matching_results = [] + for response in search_responses[1:]: + for result in response: + all_results.append(result) + if result.matches_title(name, year, year_end, aliases, remove_adult_content): + matching_results.append(result) logger.info( - f"{len(torrents)} unique torrents found for {log_name}" + f"{len(all_results)} unique torrents found ({len(matching_results)} after name filtering) for {log_name}" + ( " with " + ", ".join( @@ -355,67 +447,30 @@ async def stream( ) ) - if len(torrents) == 0: - return {"streams": []} - - if settings.TITLE_MATCH_CHECK: - aliases = await get_aliases( - session, "movies" if type == "movie" else "shows", id - ) - - indexed_torrents = [(i, torrents[i]["Title"]) for i in range(len(torrents))] - chunk_size = 50 - chunks = [ - indexed_torrents[i : i + chunk_size] - for i in range(0, len(indexed_torrents), chunk_size) - ] - - remove_adult_content = ( - settings.REMOVE_ADULT_CONTENT and config["removeTrash"] - ) - tasks = [] - for chunk in chunks: - tasks.append( - filter(chunk, name, year, year_end, aliases, remove_adult_content) - ) - - filtered_torrents = await asyncio.gather(*tasks) - index_less = 0 - for result in filtered_torrents: - for filtered in result: - if not filtered[1]: - del torrents[filtered[0] - index_less] - index_less += 1 - continue - - logger.info( - f"{len(torrents)} torrents passed title match check for {log_name}" - ) - - if len(torrents) == 0: - return {"streams": []} - - tasks = [] - for i in range(len(torrents)): - tasks.append(get_torrent_hash(session, (i, torrents[i]))) + name_matching_succeeded = False + # If we have results after name matching, use them. + if len(matching_results) > 0: + name_matching_succeeded = True + torrents = matching_results + elif len(all_results) > 0: + torrents = all_results + else: + return stream_lookup_error("No streams found!") - torrent_hashes = await asyncio.gather(*tasks) - index_less = 0 - for hash in torrent_hashes: - if not hash[1]: - del torrents[hash[0] - index_less] - index_less += 1 - continue + async with asyncio.TaskGroup() as tg: + for result in torrents: + # fetch_hash populates info_hash in result, if it's missing. + tg.create_task(result.fetch_hash(session)) - torrents[hash[0] - index_less]["InfoHash"] = hash[1] + results_with_hashes = [torrent for torrent in torrents if torrent.info_hash is not None] - logger.info(f"{len(torrents)} info hashes found for {log_name}") + logger.info(f"{len(results_with_hashes)} info hashes found for {log_name}") - if len(torrents) == 0: - return {"streams": []} + if len(results_with_hashes) == 0: + return stream_lookup_error("No streams found!") files = await debrid.get_files( - list({hash[1] for hash in torrent_hashes if hash[1] is not None}), + [result.info_hash for result in results_with_hashes], type, season, episode, @@ -445,32 +500,20 @@ async def stream( if len_sorted_ranked_files == 0: if config["debridApiKey"] == "realdebrid": - return { - "streams": [ - { - "name": "[⚠️] Comet", - "description": "RealDebrid API is unstable!", - "url": "https://comet.fast", - } - ] - } - + return stream_lookup_error("RealDebrid API is unstable!") return {"streams": []} sorted_ranked_files = { key: (value.model_dump() if isinstance(value, Torrent) else value) for key, value in sorted_ranked_files.items() } + results_by_hash = {result.info_hash: result for result in results_with_hashes} for hash in sorted_ranked_files: # needed for caching sorted_ranked_files[hash]["data"]["title"] = files[hash]["title"] - sorted_ranked_files[hash]["data"]["torrent_title"] = torrents_by_hash[hash][ - "Title" - ] - sorted_ranked_files[hash]["data"]["tracker"] = torrents_by_hash[hash][ - "Tracker" - ] + sorted_ranked_files[hash]["data"]["torrent_title"] = results_by_hash[hash].title + sorted_ranked_files[hash]["data"]["tracker"] = results_by_hash[hash].tracker sorted_ranked_files[hash]["data"]["size"] = files[hash]["size"] - torrent_size = torrents_by_hash[hash]["Size"] + torrent_size = results_by_hash[hash].size sorted_ranked_files[hash]["data"]["torrent_size"] = ( torrent_size if torrent_size else files[hash]["size"] ) @@ -494,11 +537,12 @@ async def stream( != config["debridStreamProxyPassword"] ): results.append( - { - "name": "[⚠️] Comet", - "description": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.", - "url": "https://comet.fast", - } + error_result("Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.") + ) + + if not name_matching_succeeded: + results.append( + error_result("Name matching failed! Results may not be correct.") ) for resolution in balanced_hashes: @@ -577,6 +621,7 @@ async def playback(request: Request, b64config: str, hash: str, index: str): download_link = link else: # Cache expired, remove old entry + # TODO: Don't block on removing from the cache. await database.execute( f"DELETE FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'" ) @@ -600,6 +645,7 @@ async def playback(request: Request, b64config: str, hash: str, index: str): return FileResponse("comet/assets/uncached.mp4") # Cache the new download link + # TODO: Don't block on caching the link await database.execute( f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO download_links (debrid_key, hash, file_index, link, timestamp) VALUES (:debrid_key, :hash, :file_index, :link, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", { @@ -628,33 +674,33 @@ async def playback(request: Request, b64config: str, hash: str, index: str): ): return FileResponse("comet/assets/proxylimit.mp4") - proxy = None + proxy = None - class Streamer: - def __init__(self, id: str): - self.id = id + class Streamer: + def __init__(self, id: str): + self.id = id self.client = httpx.AsyncClient(proxy=proxy, timeout=None) self.response = None - async def stream_content(self, headers: dict): - async with self.client.stream( - "GET", download_link, headers=headers - ) as self.response: - async for chunk in self.response.aiter_raw(): - yield chunk + async def stream_content(self, headers: dict): + async with self.client.stream( + "GET", download_link, headers=headers + ) as self.response: + async for chunk in self.response.aiter_raw(): + yield chunk - async def close(self): - await database.execute( - f"DELETE FROM active_connections WHERE id = '{self.id}'" - ) + async def close(self): + await database.execute( + f"DELETE FROM active_connections WHERE id = '{self.id}'" + ) - if self.response is not None: - await self.response.aclose() - if self.client is not None: - await self.client.aclose() + if self.response is not None: + await self.response.aclose() + if self.client is not None: + await self.client.aclose() - range_header = request.headers.get("range", "bytes=0-") + range_header = request.headers.get("range", "bytes=0-") try: response = await session.head( @@ -672,31 +718,30 @@ async def close(self): else: raise - if response.status == 206: - id = str(uuid.uuid4()) - await database.execute( - f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO active_connections (id, ip, content, timestamp) VALUES (:id, :ip, :content, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", - { - "id": id, - "ip": ip, - "content": str(response.url), - "timestamp": current_time, - }, - ) + if response.status != 206: + return FileResponse("comet/assets/uncached.mp4") - streamer = Streamer(id) - - return StreamingResponse( - streamer.stream_content({"Range": range_header}), - status_code=206, - headers={ - "Content-Range": response.headers["Content-Range"], - "Content-Length": response.headers["Content-Length"], - "Accept-Ranges": "bytes", - }, - background=BackgroundTask(streamer.close), - ) + id = str(uuid.uuid4()) + await database.execute( + f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO active_connections (id, ip, content, timestamp) VALUES (:id, :ip, :content, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", + { + "id": id, + "ip": ip, + "content": str(response.url), + "timestamp": current_time, + }, + ) - return FileResponse("comet/assets/uncached.mp4") + streamer = Streamer(id) + + return StreamingResponse( + streamer.stream_content({"Range": range_header}), + status_code=206, + headers={ + "Content-Range": response.headers["Content-Range"], + "Content-Length": response.headers["Content-Length"], + "Accept-Ranges": "bytes", + }, + background=BackgroundTask(streamer.close), + ) - return RedirectResponse(download_link, status_code=302) diff --git a/comet/search/results.py b/comet/search/results.py new file mode 100644 index 0000000..7c2dc8c --- /dev/null +++ b/comet/search/results.py @@ -0,0 +1,71 @@ +import aiohttp +import bencodepy +import hashlib +import re + +from RTN import parse, title_match +from comet.utils.logger import logger +from comet.utils.models import settings + +info_hash_pattern = re.compile(r"\b([a-fA-F0-9]{40})\b") + +class SearchResult: + def __init__(self, title: str, info_hash: str, tracker: str, size: str = None, link: str = None): + self.title = title + if "\n" in title: + # Torrentio title parsing + self.title = title.split("\n")[1] + self.parsed = parse(self.title) + self.info_hash = info_hash.lower() if info_hash else None + self.tracker = tracker + self.size = size + self.link = link + + def matches_title(self, + title: str, + year: int, + year_end: int, + aliases: dict, + remove_adult_content: bool): + if self.parsed.parsed_title and not title_match(title, self.parsed.parsed_title, aliases=aliases): + return False + if remove_adult_content and self.parsed.adult: + return False + if year and self.parsed.year: + if year_end: + if not year <= self.parsed.year <= year_end: + return False + else: + if not (self.parsed.year - 1) <= year <= (self.parsed.year + 1): + return False + return True + + async def fetch_hash(self, session: aiohttp.ClientSession): + if self.info_hash is not None: + return + + try: + timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT) + response = await session.get(self.link, allow_redirects=False, timeout=timeout) + if response.status == 200: + torrent_data = await response.read() + torrent_dict = bencodepy.decode(torrent_data) + info = bencodepy.encode(torrent_dict[b"info"]) + hash = hashlib.sha1(info).hexdigest() + else: + location = response.headers.get("Location", "") + if not location: + return None + + match = info_hash_pattern.search(location) + if not match: + return None + + hash = match.group(1).upper() + + self.info_hash = hash.lower() + except Exception as e: + logger.warning( + f"Exception while getting torrent info hash for {self.tracker}|<>: {e}" + ) + return diff --git a/comet/utils/general.py b/comet/utils/general.py index 074d2a0..858a175 100644 --- a/comet/utils/general.py +++ b/comet/utils/general.py @@ -15,6 +15,7 @@ from comet.utils.logger import logger from comet.utils.models import database, settings, ConfigModel +from comet.search.results import SearchResult languages_emojis = { "unknown": "❓", # Unknown @@ -294,7 +295,13 @@ async def fetch_jackett_results( all_results = await asyncio.gather(*tasks) for result_set in all_results: - results.extend(result_set) + for result in result_set: + results.append(SearchResult( + title=result["Title"], + info_hash=result["InfoHash"], + tracker=result["Tracker"], + size=result["Size"], + link=result["Link"])) elif indexer_manager_type == "prowlarr": get_indexers = await session.get( @@ -318,17 +325,12 @@ async def fetch_jackett_results( response = await response.json() for result in response: - result["InfoHash"] = ( - result["infoHash"] if "infoHash" in result else None - ) - result["Title"] = result["title"] - result["Size"] = result["size"] - result["Link"] = ( - result["downloadUrl"] if "downloadUrl" in result else None - ) - result["Tracker"] = result["indexer"] - - results.append(result) + results.append(SearchResult( + title=result["title"], + info_hash=result["infoHash"] if "infoHash" in result else None, + tracker=result["indexer"], + size=result["size"], + link=result["downloadUrl"] if "downloadUrl" in result else None)) except Exception as e: logger.warning( f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}" @@ -352,14 +354,11 @@ async def get_zilean( if isinstance(get_dmm, list): take_first = get_dmm[: settings.ZILEAN_TAKE_FIRST] for result in take_first: - object = { - "Title": result["raw_title"], - "InfoHash": result["info_hash"], - "Size": result["size"], - "Tracker": "DMM", - } - - results.append(object) + results.append(SearchResult( + title=result["raw_title"], + info_hash=result["info_hash"], + tracker="DMM", + size=result["size"])) logger.info(f"{len(results)} torrents found for {log_name} with Zilean") except Exception as e: @@ -392,14 +391,10 @@ async def get_torrentio(log_name: str, type: str, full_id: str): title = title_full.split("\n")[0] tracker = title_full.split("⚙️ ")[1].split("\n")[0] - results.append( - { - "Title": title, - "InfoHash": torrent["infoHash"], - "Size": None, - "Tracker": f"Torrentio|{tracker}", - } - ) + results.append(SearchResult( + title=title, + info_hash=torrent["infoHash"], + tracker=f"Torrentio|{tracker}")) logger.info(f"{len(results)} torrents found for {log_name} with Torrentio") except Exception as e: @@ -454,85 +449,6 @@ async def get_mediafusion(log_name: str, type: str, full_id: str): return results -async def filter( - torrents: list, - name: str, - year: int, - year_end: int, - aliases: dict, - remove_adult_content: bool, -): - results = [] - for torrent in torrents: - index = torrent[0] - title = torrent[1] - - if "\n" in title: # Torrentio title parsing - title = title.split("\n")[1] - - parsed = parse(title) - - if remove_adult_content and parsed.adult: - results.append((index, False)) - continue - - if parsed.parsed_title and not title_match( - name, parsed.parsed_title, aliases=aliases - ): - results.append((index, False)) - continue - - if year and parsed.year: - if year_end is not None: - if not (year <= parsed.year <= year_end): - results.append((index, False)) - continue - else: - if year < (parsed.year - 1) or year > (parsed.year + 1): - results.append((index, False)) - continue - - results.append((index, True)) - - return results - - -async def get_torrent_hash(session: aiohttp.ClientSession, torrent: tuple): - index = torrent[0] - torrent = torrent[1] - if "InfoHash" in torrent and torrent["InfoHash"] is not None: - return (index, torrent["InfoHash"].lower()) - - url = torrent["Link"] - - try: - timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT) - response = await session.get(url, allow_redirects=False, timeout=timeout) - if response.status == 200: - torrent_data = await response.read() - torrent_dict = bencodepy.decode(torrent_data) - info = bencodepy.encode(torrent_dict[b"info"]) - hash = hashlib.sha1(info).hexdigest() - else: - location = response.headers.get("Location", "") - if not location: - return (index, None) - - match = info_hash_pattern.search(location) - if not match: - return (index, None) - - hash = match.group(1).upper() - - return (index, hash.lower()) - except Exception as e: - logger.warning( - f"Exception while getting torrent info hash for {torrent['indexer'] if 'indexer' in torrent else (torrent['Tracker'] if 'Tracker' in torrent else '')}|{url}: {e}" - ) - - return (index, None) - - def get_balanced_hashes(hashes: dict, config: dict): max_results = config["maxResults"] max_results_per_resolution = config["maxResultsPerResolution"] From caf9527fd1a580bb66438fc7c94361cbfaa35d09 Mon Sep 17 00:00:00 2001 From: magmadude <8989953+magmadude20@users.noreply.github.com> Date: Sat, 23 Nov 2024 21:38:29 -0600 Subject: [PATCH 2/2] Finish rebase --- comet/api/stream.py | 161 ++++++++++---------------------------------- 1 file changed, 36 insertions(+), 125 deletions(-) diff --git a/comet/api/stream.py b/comet/api/stream.py index 62052b2..02fe7ac 100644 --- a/comet/api/stream.py +++ b/comet/api/stream.py @@ -131,92 +131,6 @@ async def stream( if type == "series": log_name = f"{name} S{season:02d}E{episode:02d}" - cache_key = hashlib.md5( - json.dumps( - { - "debridService": config["debridService"], - "name": name, - "season": season, - "episode": episode, - "indexers": config["indexers"], - } - ).encode("utf-8") - ).hexdigest() - cached = await database.fetch_one( - f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')" - ) - if cached[0] != 0: - logger.info(f"Cache found for {log_name}") - - timestamp = await database.fetch_one( - f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'" - ) - if timestamp[0] + settings.CACHE_TTL < time.time(): - await database.execute( - f"DELETE FROM cache WHERE cacheKey = '{cache_key}'" - ) - - logger.info(f"Cache expired for {log_name}") - else: - sorted_ranked_files = await database.fetch_one( - f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'" - ) - sorted_ranked_files = json.loads(sorted_ranked_files[0]) - - debrid_extension = get_debrid_extension(config["debridService"]) - - balanced_hashes = get_balanced_hashes(sorted_ranked_files, config) - - results = [] - if ( - config["debridStreamProxyPassword"] != "" - and settings.PROXY_DEBRID_STREAM - and settings.PROXY_DEBRID_STREAM_PASSWORD - != config["debridStreamProxyPassword"] - ): - results.append( - { - "name": "[⚠️] Comet", - "title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.", - "url": "https://comet.fast", - } - ) - - for ( - hash, - hash_data, - ) in sorted_ranked_files.items(): - for resolution, hash_list in balanced_hashes.items(): - if hash in hash_list: - data = hash_data["data"] - results.append( - { - "name": f"[{debrid_extension}⚡] Comet {data['resolution']}", - "title": format_title(data, config), - "torrentTitle": ( - data["torrent_title"] - if "torrent_title" in data - else None - ), - "torrentSize": ( - data["torrent_size"] - if "torrent_size" in data - else None - ), - "url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}", - "behaviorHints": { - "filename": data["raw_title"], - "bingeGroup": "comet|" + hash, - }, - } - ) - - continue - - return {"streams": results} - else: - logger.info(f"No cache found for {log_name} with user configuration") - if ( settings.PROXY_DEBRID_STREAM and settings.PROXY_DEBRID_STREAM_PASSWORD @@ -367,9 +281,9 @@ async def stream( tasks = [] # get_aliases is always the first task. tasks.append( - get_aliases( - session, "movies" if type == "movie" else "shows", id - )) + get_aliases( + session, "movies" if type == "movie" else "shows", id + )) if indexer_manager_type and search_indexer: logger.info( @@ -478,11 +392,11 @@ async def stream( ) ranked_files = set() - torrents_by_hash = {torrent["InfoHash"]: torrent for torrent in torrents} + results_by_hash = {result.info_hash: result for result in results_with_hashes} for hash in files: try: ranked_file = rtn.rank( - torrents_by_hash[hash]["Title"], + results_by_hash[hash].title, hash, remove_trash=False, # user can choose if he wants to remove it ) @@ -507,7 +421,6 @@ async def stream( key: (value.model_dump() if isinstance(value, Torrent) else value) for key, value in sorted_ranked_files.items() } - results_by_hash = {result.info_hash: result for result in results_with_hashes} for hash in sorted_ranked_files: # needed for caching sorted_ranked_files[hash]["data"]["title"] = files[hash]["title"] sorted_ranked_files[hash]["data"]["torrent_title"] = results_by_hash[hash].title @@ -621,7 +534,6 @@ async def playback(request: Request, b64config: str, hash: str, index: str): download_link = link else: # Cache expired, remove old entry - # TODO: Don't block on removing from the cache. await database.execute( f"DELETE FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'" ) @@ -645,7 +557,6 @@ async def playback(request: Request, b64config: str, hash: str, index: str): return FileResponse("comet/assets/uncached.mp4") # Cache the new download link - # TODO: Don't block on caching the link await database.execute( f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO download_links (debrid_key, hash, file_index, link, timestamp) VALUES (:debrid_key, :hash, :file_index, :link, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", { @@ -657,31 +568,32 @@ async def playback(request: Request, b64config: str, hash: str, index: str): }, ) - if ( + if not ( settings.PROXY_DEBRID_STREAM and settings.PROXY_DEBRID_STREAM_PASSWORD == config["debridStreamProxyPassword"] ): - if settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS != -1: - active_ip_connections = await database.fetch_all( - "SELECT ip, COUNT(*) as connections FROM active_connections GROUP BY ip" - ) - if any( - connection["ip"] == ip - and connection["connections"] - >= settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS - for connection in active_ip_connections - ): - return FileResponse("comet/assets/proxylimit.mp4") + return RedirectResponse(download_link, status_code=302) + + if settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS != -1: + active_ip_connections = await database.fetch_all( + "SELECT ip, COUNT(*) as connections FROM active_connections GROUP BY ip" + ) + if any( + connection["ip"] == ip + and connection["connections"] + >= settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS + for connection in active_ip_connections + ): + return FileResponse("comet/assets/proxylimit.mp4") proxy = None class Streamer: def __init__(self, id: str): self.id = id - - self.client = httpx.AsyncClient(proxy=proxy, timeout=None) - self.response = None + self.client = httpx.AsyncClient(proxy=proxy, timeout=None) + self.response = None async def stream_content(self, headers: dict): async with self.client.stream( @@ -702,21 +614,21 @@ async def close(self): range_header = request.headers.get("range", "bytes=0-") - try: - response = await session.head( - download_link, headers={"Range": range_header} - ) - except aiohttp.ClientResponseError as e: - if e.status == 503 and config["debridService"] == "alldebrid": - proxy = ( - settings.DEBRID_PROXY_URL - ) # proxy is not needed to proxy realdebrid stream - - response = await session.head( - download_link, headers={"Range": range_header}, proxy=proxy - ) - else: - raise + try: + response = await session.head( + download_link, headers={"Range": range_header} + ) + except aiohttp.ClientResponseError as e: + if e.status == 503 and config["debridService"] == "alldebrid": + proxy = ( + settings.DEBRID_PROXY_URL + ) # proxy is not needed to proxy realdebrid stream + + response = await session.head( + download_link, headers={"Range": range_header}, proxy=proxy + ) + else: + raise if response.status != 206: return FileResponse("comet/assets/uncached.mp4") @@ -744,4 +656,3 @@ async def close(self): }, background=BackgroundTask(streamer.close), ) -