-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add TransmissionBt client implementation (#26)
* Add TransmissionBt client implementation * Made linting happy * Added tests --------- Co-authored-by: moleculekayak <[email protected]>
- Loading branch information
1 parent
19c824e
commit ca94825
Showing
6 changed files
with
349 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import base64 | ||
import json | ||
from enum import Enum | ||
from http import HTTPStatus | ||
|
||
import requests | ||
|
||
from requests.auth import HTTPBasicAuth | ||
from requests.structures import CaseInsensitiveDict | ||
|
||
from ..filesystem import sane_join | ||
from ..parser import get_bencoded_data, calculate_infohash | ||
from ..errors import TorrentClientError, TorrentClientAuthenticationError, TorrentExistsInClientError | ||
from .torrent_client import TorrentClient | ||
|
||
|
||
class StatusEnum(Enum): | ||
STOPPED = 0 | ||
QUEUED_VERIFY = 1 | ||
VERIFYING = 2 | ||
QUEUE_DOWNLOAD = 3 | ||
DOWNLOADING = 4 | ||
QUEUED_SEED = 5 | ||
SEEDING = 6 | ||
|
||
|
||
class TransmissionBt(TorrentClient): | ||
X_TRANSMISSION_SESSION_ID = "X-Transmission-Session-Id" | ||
|
||
def __init__(self, rpc_url): | ||
super().__init__() | ||
transmission_url_parts = self._extract_credentials_from_url(rpc_url, "transmission/rpc") | ||
self._base_url = transmission_url_parts[0] | ||
self._basic_auth = HTTPBasicAuth(transmission_url_parts[1], transmission_url_parts[2]) | ||
self._transmission_session_id = None | ||
|
||
def setup(self): | ||
self.__authenticate() | ||
return self | ||
|
||
def get_torrent_info(self, infohash): | ||
response = self.__wrap_request( | ||
"torrent-get", | ||
arguments={"fields": ["labels", "downloadDir", "percentDone", "status", "doneDate", "name"], "ids": [infohash]}, | ||
) | ||
|
||
if response: | ||
try: | ||
parsed_response = json.loads(response) | ||
except json.JSONDecodeError as json_parse_error: | ||
raise TorrentClientError("Client returned malformed json response") from json_parse_error | ||
|
||
if not parsed_response.get("arguments", {}).get("torrents", []): | ||
raise TorrentClientError(f"Torrent not found in client ({infohash})") | ||
|
||
torrent = parsed_response["arguments"]["torrents"][0] | ||
torrent_completed = (torrent["percentDone"] == 1.0 or torrent["doneDate"] > 0) and torrent["status"] in [ | ||
StatusEnum.SEEDING.value, | ||
StatusEnum.QUEUED_SEED.value, | ||
] | ||
|
||
return { | ||
"complete": torrent_completed, | ||
"label": torrent["labels"], | ||
"save_path": torrent["downloadDir"], | ||
"content_path": sane_join(torrent["downloadDir"], torrent["name"]), | ||
} | ||
else: | ||
raise TorrentClientError("Client returned unexpected response") | ||
|
||
def inject_torrent(self, source_torrent_infohash, new_torrent_filepath, save_path_override=None): | ||
source_torrent_info = self.get_torrent_info(source_torrent_infohash) | ||
|
||
if not source_torrent_info["complete"]: | ||
raise TorrentClientError("Cannot inject a torrent that is not complete") | ||
|
||
new_torrent_infohash = calculate_infohash(get_bencoded_data(new_torrent_filepath)).lower() | ||
new_torrent_already_exists = self.__does_torrent_exist_in_client(new_torrent_infohash) | ||
if new_torrent_already_exists: | ||
raise TorrentExistsInClientError(f"New torrent already exists in client ({new_torrent_infohash})") | ||
|
||
self.__wrap_request( | ||
"torrent-add", | ||
arguments={ | ||
"download-dir": save_path_override if save_path_override else source_torrent_info["save_path"], | ||
"metainfo": base64.b64encode(open(new_torrent_filepath, "rb").read()).decode("utf-8"), | ||
"labels": source_torrent_info["label"], | ||
}, | ||
) | ||
|
||
return new_torrent_infohash | ||
|
||
def __authenticate(self): | ||
try: | ||
# This method specifically does not use the __wrap_request method | ||
# because we want to avoid an infinite loop of re-authenticating | ||
response = requests.post(self._base_url, auth=self._basic_auth) | ||
# TransmissionBt returns a 409 status code if the session id is invalid | ||
# (which it is on your first request) and includes a new session id in the response headers. | ||
if response.status_code == HTTPStatus.CONFLICT: | ||
self._transmission_session_id = response.headers.get(self.X_TRANSMISSION_SESSION_ID) | ||
else: | ||
response.raise_for_status() | ||
except requests.RequestException as e: | ||
raise TorrentClientAuthenticationError(f"TransmissionBt login failed: {e}") | ||
|
||
if not self._transmission_session_id: | ||
raise TorrentClientAuthenticationError("TransmissionBt login failed: Invalid username or password") | ||
|
||
def __wrap_request(self, method, arguments, files=None): | ||
try: | ||
return self.__request(method, arguments, files) | ||
except TorrentClientAuthenticationError: | ||
self.__authenticate() | ||
return self.__request(method, arguments, files) | ||
|
||
def __request(self, method, arguments=None, files=None): | ||
try: | ||
response = requests.post( | ||
self._base_url, | ||
auth=self._basic_auth, | ||
headers=CaseInsensitiveDict({self.X_TRANSMISSION_SESSION_ID: self._transmission_session_id}), | ||
json={"method": method, "arguments": arguments}, | ||
files=files, | ||
) | ||
|
||
response.raise_for_status() | ||
|
||
return response.text | ||
except requests.RequestException as e: | ||
if e.response.status_code == HTTPStatus.CONFLICT: | ||
raise TorrentClientAuthenticationError("Failed to authenticate with TransmissionBt") | ||
|
||
raise TorrentClientError(f"TransmissionBt request to '{self._base_url}' for method '{method}' failed: {e}") | ||
|
||
def __does_torrent_exist_in_client(self, infohash): | ||
try: | ||
return bool(self.get_torrent_info(infohash)) | ||
except TorrentClientError: | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
import re | ||
import pytest | ||
import requests_mock | ||
|
||
from tests.helpers import SetupTeardown, get_torrent_path | ||
|
||
from src.errors import TorrentClientError, TorrentClientAuthenticationError, TorrentExistsInClientError | ||
from src.clients.transmission import TransmissionBt | ||
|
||
|
||
@pytest.fixture | ||
def transmission_client(): | ||
return TransmissionBt("http://admin:supersecret@localhost:51314") | ||
|
||
|
||
@pytest.fixture | ||
def torrent_info_response(): | ||
return { | ||
"arguments": { | ||
"torrents": [ | ||
{ | ||
"name": "foo.torrent", | ||
"percentDone": 1.0, | ||
"doneDate": 0, | ||
"status": 6, | ||
"labels": ["bar"], | ||
"downloadDir": "/tmp/baz", | ||
} | ||
] | ||
} | ||
} | ||
|
||
|
||
class TestInit(SetupTeardown): | ||
def test_initializes_with_url_parts(self): | ||
transmission_client = TransmissionBt("http://admin:supersecret@localhost:51314") | ||
|
||
assert transmission_client._base_url == "http://localhost:51314/transmission/rpc" | ||
assert transmission_client._basic_auth.username == "admin" | ||
assert transmission_client._basic_auth.password == "supersecret" | ||
|
||
|
||
class TestSetup(SetupTeardown): | ||
def test_sets_session_id(self, transmission_client): | ||
assert transmission_client._transmission_session_id is None | ||
|
||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), headers={"X-Transmission-Session-Id": "1234"}, status_code=409) | ||
|
||
transmission_client.setup() | ||
|
||
assert transmission_client._transmission_session_id == "1234" | ||
|
||
def test_raises_exception_on_failed_auth(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), status_code=403) | ||
|
||
with pytest.raises(TorrentClientAuthenticationError): | ||
transmission_client.setup() | ||
|
||
def test_raises_exception_if_no_session_id(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), status_code=409) | ||
|
||
with pytest.raises(TorrentClientAuthenticationError): | ||
transmission_client.setup() | ||
|
||
|
||
class TestGetTorrentInfo(SetupTeardown): | ||
def test_returns_torrent_info(self, transmission_client, torrent_info_response): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), json=torrent_info_response) | ||
|
||
response = transmission_client.get_torrent_info("infohash") | ||
|
||
assert response == { | ||
"complete": True, | ||
"label": ["bar"], | ||
"save_path": "/tmp/baz", | ||
"content_path": "/tmp/baz/foo.torrent", | ||
} | ||
|
||
def test_passes_headers_to_request(self, transmission_client, torrent_info_response): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), headers={"X-Transmission-Session-Id": "1234"}, status_code=409) | ||
transmission_client.setup() | ||
|
||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), json=torrent_info_response) | ||
|
||
transmission_client.get_torrent_info("infohash") | ||
|
||
assert m.last_request.headers["X-Transmission-Session-Id"] == transmission_client._transmission_session_id | ||
|
||
def test_passes_json_body_to_request(self, transmission_client, torrent_info_response): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), json=torrent_info_response) | ||
|
||
transmission_client.get_torrent_info("infohash") | ||
|
||
assert m.last_request.json() == { | ||
"method": "torrent-get", | ||
"arguments": { | ||
"ids": ["infohash"], | ||
"fields": ["labels", "downloadDir", "percentDone", "status", "doneDate", "name"], | ||
}, | ||
} | ||
|
||
def test_raises_if_json_error(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), text="not json") | ||
|
||
with pytest.raises(TorrentClientError, match="Client returned malformed json response"): | ||
transmission_client.get_torrent_info("infohash") | ||
|
||
def test_raises_if_no_torrents_found(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), json={"arguments": {"torrents": []}}) | ||
|
||
with pytest.raises(TorrentClientError, match="Torrent not found in client"): | ||
transmission_client.get_torrent_info("infohash") | ||
|
||
def test_raises_on_unexpected_response(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), text="") | ||
|
||
with pytest.raises(TorrentClientError, match="Client returned unexpected response"): | ||
transmission_client.get_torrent_info("infohash") | ||
|
||
|
||
class TestInjectTorrent(SetupTeardown): | ||
def test_injects_torrent(self, transmission_client, torrent_info_response): | ||
torrent_path = get_torrent_path("red_source") | ||
|
||
with requests_mock.Mocker() as m: | ||
m.post( | ||
re.compile("transmission/rpc"), [{"json": torrent_info_response}, {"json": {"arguments": {"torrents": []}}}] | ||
) | ||
|
||
transmission_client.inject_torrent("foo", torrent_path) | ||
|
||
assert b'"method": "torrent-add"' in m.request_history[-1].body | ||
assert b'"download-dir": "/tmp/baz"' in m.request_history[-1].body | ||
assert b'"labels": ["bar"]' in m.request_history[-1].body | ||
assert b'"metainfo"' in m.request_history[-1].body | ||
|
||
def test_uses_save_path_override_if_present(self, transmission_client, torrent_info_response): | ||
torrent_path = get_torrent_path("red_source") | ||
|
||
with requests_mock.Mocker() as m: | ||
m.post( | ||
re.compile("transmission/rpc"), [{"json": torrent_info_response}, {"json": {"arguments": {"torrents": []}}}] | ||
) | ||
|
||
transmission_client.inject_torrent("foo", torrent_path, "/tmp/override/") | ||
|
||
assert b'"download-dir": "/tmp/override/"' in m.request_history[-1].body | ||
|
||
def test_raises_if_source_torrent_isnt_found_in_client(self, transmission_client): | ||
with requests_mock.Mocker() as m: | ||
m.post( | ||
re.compile("transmission/rpc"), | ||
[{"json": {"arguments": {"torrents": []}}}, {"json": {"arguments": {"torrents": []}}}], | ||
) | ||
|
||
with pytest.raises(TorrentClientError, match="Torrent not found in client"): | ||
transmission_client.inject_torrent("foo", "bar.torrent") | ||
|
||
def test_raises_if_destination_torrent_is_found_in_client(self, transmission_client, torrent_info_response): | ||
torrent_path = get_torrent_path("red_source") | ||
|
||
with requests_mock.Mocker() as m: | ||
m.post(re.compile("transmission/rpc"), [{"json": torrent_info_response}, {"json": torrent_info_response}]) | ||
|
||
with pytest.raises(TorrentExistsInClientError, match="New torrent already exists in client"): | ||
transmission_client.inject_torrent("foo", torrent_path) |
Oops, something went wrong.