Skip to content

Commit

Permalink
Infohash precheck (#3)
Browse files Browse the repository at this point in the history
* Updated infohash methods

* Hooked up infohash prechecking for directory scanner

* Hooked up new entrypoint for scanning single torrent files
  • Loading branch information
moleculekayak authored Jul 30, 2024
1 parent 04b0c30 commit cc0067c
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 78 deletions.
9 changes: 3 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from src.api import RedAPI, OpsAPI
from src.args import parse_args
from src.config import Config
from src.torrent import generate_new_torrent_from_file
from src.scanner import scan_torrent_directory
from src.scanner import scan_torrent_directory, scan_torrent_file

from src.webserver import run_webserver

Expand All @@ -22,11 +21,9 @@ def cli_entrypoint():
if args.server:
run_webserver(args.input_directory, args.output_directory, red_api, ops_api, port=os.environ.get("PORT", 9713))
elif args.input_file:
_, torrent_path = generate_new_torrent_from_file(args.input_file, args.output_directory, red_api, ops_api)
print(torrent_path)
print(scan_torrent_file(args.input_file, args.output_directory, red_api, ops_api))
elif args.input_directory:
report = scan_torrent_directory(args.input_directory, args.output_directory, red_api, ops_api)
print(report)
print(scan_torrent_directory(args.input_directory, args.output_directory, red_api, ops_api))
except Exception as e:
print(f"{Fore.RED}{str(e)}{Fore.RESET}")
exit(1)
Expand Down
9 changes: 5 additions & 4 deletions src/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def get_origin_tracker(torrent_data: dict) -> RedTracker | OpsTracker | None:
return None


def calculate_infohash(torrent_data: dict) -> str:
return sha1(bencoder.encode(torrent_data[b"info"])).hexdigest().upper()


def recalculate_hash_for_new_source(torrent_data: dict, new_source: (bytes | str)) -> str:
torrent_data = copy.deepcopy(torrent_data)
new_source = new_source.encode() if isinstance(new_source, str) else new_source

torrent_data[b"info"][b"source"] = new_source
hash = sha1(bencoder.encode(torrent_data[b"info"])).hexdigest().upper()

return hash
return calculate_infohash(torrent_data)


def get_torrent_data(filename: str) -> dict:
Expand Down
68 changes: 63 additions & 5 deletions src/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,47 @@
from .filesystem import mkdir_p, list_files_of_extension, assert_path_exists
from .progress import Progress
from .torrent import generate_new_torrent_from_file
from .parser import get_torrent_data, calculate_infohash
from .errors import TorrentDecodingError, UnknownTrackerError, TorrentNotFoundError, TorrentAlreadyExistsError


def scan_torrent_file(
torrent_path: str,
output_directory: str,
red_api: RedAPI,
ops_api: OpsAPI,
) -> str:
"""
Scans a single .torrent file and generates a new one using the tracker API.
Args:
`torrent_path` (`str`): The path to the .torrent file.
`output_directory` (`str`): The directory to save the new .torrent files.
`red_api` (`RedAPI`): The pre-configured RED tracker API.
`ops_api` (`OpsAPI`): The pre-configured OPS tracker API.
Returns:
str: The path to the new .torrent file.
Raises:
See `generate_new_torrent_from_file`.
"""
torrent_path = assert_path_exists(torrent_path)
output_directory = mkdir_p(output_directory)

output_torrents = list_files_of_extension(output_directory, ".torrent")
output_infohashes = __collect_infohashes_from_files(output_torrents)

_new_tracker, new_torrent_filepath = generate_new_torrent_from_file(
torrent_path,
output_directory,
red_api,
ops_api,
input_infohashes={},
output_infohashes=output_infohashes,
)

return new_torrent_filepath


def scan_torrent_directory(
input_directory: str,
output_directory: str,
Expand All @@ -29,10 +67,15 @@ def scan_torrent_directory(

input_directory = assert_path_exists(input_directory)
output_directory = mkdir_p(output_directory)
local_torrents = list_files_of_extension(input_directory, ".torrent")
p = Progress(len(local_torrents))

for i, torrent_path in enumerate(local_torrents, 1):
input_torrents = list_files_of_extension(input_directory, ".torrent")
output_torrents = list_files_of_extension(output_directory, ".torrent")
input_infohashes = __collect_infohashes_from_files(input_torrents)
output_infohashes = __collect_infohashes_from_files(output_torrents)

p = Progress(len(input_torrents))

for i, torrent_path in enumerate(input_torrents, 1):
basename = os.path.basename(torrent_path)
print(f"({i}/{p.total}) {basename}")

Expand All @@ -42,6 +85,8 @@ def scan_torrent_directory(
output_directory,
red_api,
ops_api,
input_infohashes,
output_infohashes,
)

p.generated.print(
Expand All @@ -53,8 +98,8 @@ def scan_torrent_directory(
except UnknownTrackerError as e:
p.skipped.print(str(e))
continue
except TorrentAlreadyExistsError:
p.already_exists.print("Found, but the output .torrent already exists.")
except TorrentAlreadyExistsError as e:
p.already_exists.print(str(e))
continue
except TorrentNotFoundError as e:
p.not_found.print(str(e))
Expand All @@ -64,3 +109,16 @@ def scan_torrent_directory(
continue

return p.report()


def __collect_infohashes_from_files(files: list[str]) -> dict:
infohash_dict = {}

for filename in files:
torrent_data = get_torrent_data(filename)

if torrent_data:
infohash = calculate_infohash(torrent_data)
infohash_dict[infohash] = torrent_data[b"info"][b"name"].decode()

return infohash_dict
46 changes: 21 additions & 25 deletions src/torrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
from .api import RedAPI, OpsAPI
from .trackers import RedTracker, OpsTracker
from .errors import TorrentDecodingError, UnknownTrackerError, TorrentNotFoundError, TorrentAlreadyExistsError
from .parser import get_torrent_data, get_origin_tracker, recalculate_hash_for_new_source, save_torrent_data
from .parser import (
get_torrent_data,
get_origin_tracker,
recalculate_hash_for_new_source,
save_torrent_data,
)


def generate_new_torrent_from_file(
old_torrent_path: str,
output_directory: str,
red_api: RedAPI,
ops_api: OpsAPI,
input_infohashes: dict = {},
output_infohashes: dict = {},
) -> tuple[OpsTracker | RedTracker, str]:
"""
Generates a new torrent file for the reciprocal tracker of the original torrent file if it exists on the reciprocal tracker.
Expand All @@ -22,13 +29,15 @@ def generate_new_torrent_from_file(
`output_directory` (`str`): The directory to save the new torrent file.
`red_api` (`RedApi`): The pre-configured API object for RED.
`ops_api` (`OpsApi`): The pre-configured API object for OPS.
`input_infohashes` (`dict`, optional): A dictionary of infohashes and their filenames from the input directory for caching purposes. Defaults to an empty dictionary.
`output_infohashes` (`dict`, optional): A dictionary of infohashes and their filenames from the output directory for caching purposes. Defaults to an empty dictionary.
Returns:
A tuple containing the new tracker class (`RedTracker` or `OpsTracker`) and the path to the new torrent file.
Raises:
`TorrentDecodingError`: if the original torrent file could not be decoded.
`UnknownTrackerError`: if the original torrent file is not from OPS or RED.
`TorrentNotFoundError`: if the original torrent file could not be found on the reciprocal tracker.
`TorrentAlreadyExistsError`: if the new torrent file already exists in the output directory.
`TorrentAlreadyExistsError`: if the new torrent file already exists in the input or output directory.
`Exception`: if an unknown error occurs.
"""

Expand All @@ -39,6 +48,12 @@ def generate_new_torrent_from_file(

for new_source in new_tracker.source_flags_for_creation():
new_hash = recalculate_hash_for_new_source(old_torrent_data, new_source)

if new_hash in input_infohashes:
raise TorrentAlreadyExistsError(f"Torrent already exists in input directory as {input_infohashes[new_hash]}")
if new_hash in output_infohashes:
raise TorrentAlreadyExistsError(f"Torrent already exists in output directory as {output_infohashes[new_hash]}")

api_response = new_tracker_api.find_torrent(new_hash)

if api_response["status"] == "success":
Expand All @@ -49,11 +64,11 @@ def generate_new_torrent_from_file(
)

if new_torrent_filepath:
torrent_id = get_torrent_id(api_response)
torrent_id = __get_torrent_id(api_response)

new_torrent_data[b"info"][b"source"] = new_source # This is already bytes rather than str
new_torrent_data[b"announce"] = new_tracker_api.announce_url.encode()
new_torrent_data[b"comment"] = generate_torrent_url(new_tracker_api.site_url, torrent_id).encode()
new_torrent_data[b"comment"] = __generate_torrent_url(new_tracker_api.site_url, torrent_id).encode()

return (new_tracker, save_torrent_data(new_torrent_filepath, new_torrent_data))
elif api_response["error"] in ("bad hash parameter", "bad parameters"):
Expand Down Expand Up @@ -86,30 +101,11 @@ def generate_torrent_output_filepath(api_response: dict, new_source: str, output
return torrent_filepath


def get_torrent_id(api_response: dict) -> str:
"""
Extracts the torrent ID from the API response.
Args:
`api_response` (`dict`): The response from the tracker API.
Returns:
The torrent ID.
"""

def __get_torrent_id(api_response: dict) -> str:
return api_response["response"]["torrent"]["id"]


def generate_torrent_url(site_url: str, torrent_id: str) -> str:
"""
Generates the URL to the torrent on the tracker.
Args:
`site_url` (`str`): The base URL of the tracker.
`torrent_id` (`str`): The ID of the torrent.
Returns:
The URL to the torrent.
"""

def __generate_torrent_url(site_url: str, torrent_id: str) -> str:
return f"{site_url}/torrents.php?torrentid={torrent_id}"


Expand Down
4 changes: 2 additions & 2 deletions src/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from flask import Flask, request

from src.parser import is_valid_infohash
from src.torrent import generate_new_torrent_from_file
from src.scanner import scan_torrent_file
from src.errors import TorrentAlreadyExistsError, TorrentNotFoundError

app = Flask(__name__)
Expand All @@ -24,7 +24,7 @@ def webhook():
return http_error(f"No torrent found at {filepath}", 404)

try:
_, new_filepath = generate_new_torrent_from_file(
new_filepath = scan_torrent_file(
filepath,
config["output_dir"],
config["red_api"],
Expand Down
23 changes: 16 additions & 7 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
get_origin_tracker,
recalculate_hash_for_new_source,
save_torrent_data,
calculate_infohash,
)


class TestParserIsValidInfohash(SetupTeardown):
class TestIsValidInfohash(SetupTeardown):
def test_returns_true_for_valid_infohash(self):
assert is_valid_infohash("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")

Expand All @@ -25,23 +26,23 @@ def test_returns_false_for_invalid_infohash(self):
assert not is_valid_infohash(123)


class TestParserGetSource(SetupTeardown):
class TestGetSource(SetupTeardown):
def test_returns_source_if_present(self):
assert get_source({b"info": {b"source": b"FOO"}}) == b"FOO"

def test_returns_none_if_absent(self):
assert get_source({}) is None


class TestParserGetAnnounceUrl(SetupTeardown):
class TestGetAnnounceUrl(SetupTeardown):
def test_returns_url_if_present(self):
assert get_announce_url({b"announce": b"https://foo.bar"}) == b"https://foo.bar"

def test_returns_none_if_absent(self):
assert get_announce_url({}) is None


class TestParserGetOriginTracker(SetupTeardown):
class TestGetOriginTracker(SetupTeardown):
def test_returns_red_based_on_source(self):
assert get_origin_tracker({b"info": {b"source": b"RED"}}) == RedTracker
assert get_origin_tracker({b"info": {b"source": b"PTH"}}) == RedTracker
Expand All @@ -61,7 +62,15 @@ def test_returns_none_if_no_match(self):
assert get_origin_tracker({b"announce": b"https://foo/123abc"}) is None


class TestParserReplaceSourceAndReturnHash(SetupTeardown):
class TestCalculateInfohash(SetupTeardown):
def test_returns_infohash(self):
torrent_data = {b"info": {b"source": b"RED"}}
result = calculate_infohash(torrent_data)

assert result == "FD2F1D966DF7E2E35B0CF56BC8510C6BB4D44467"


class TestRecalculateHashForNewSource(SetupTeardown):
def test_replaces_source_and_returns_hash(self):
torrent_data = {b"info": {b"source": b"RED"}}
new_source = b"OPS"
Expand All @@ -79,7 +88,7 @@ def test_doesnt_mutate_original_dict(self):
assert torrent_data == {b"info": {b"source": b"RED"}}


class TestParserGetTorrentData(SetupTeardown):
class TestGetTorrentData(SetupTeardown):
def test_returns_torrent_data(self):
result = get_torrent_data(get_torrent_path("no_source"))

Expand All @@ -92,7 +101,7 @@ def test_returns_none_on_error(self):
assert result is None


class TestParserSaveTorrentData(SetupTeardown):
class TestSaveTorrentData(SetupTeardown):
def test_saves_torrent_data(self):
torrent_data = {b"info": {b"source": b"RED"}}
filename = "/tmp/test_save_torrent_data.torrent"
Expand Down
Loading

0 comments on commit cc0067c

Please sign in to comment.