From 9efca3c3a4f7bb64f0544126cc4819302bfaccff Mon Sep 17 00:00:00 2001 From: Nathan Gillett Date: Fri, 15 Mar 2024 16:18:43 -0500 Subject: [PATCH] Replace usage of _pulp_client with pubtools-pulplib [RHELDST-141] This commit removes the library's native _pulp_client ("legacy") and replaces it with usage of pubtools-pulplib methods for associating and unassociating content. Additionally, PulpActions were replaced with simple abstractions for grouping content to be associated/unassociated. Tests were removed/edited as necessary. --- tests/integration/test_association.py | 48 +-- tests/test_pulp.py | 387 ----------------------- tests/test_ubipop.py | 231 +++----------- tests/test_utils.py | 85 +----- ubipop/__init__.py | 425 ++++++++++++++------------ ubipop/_pulp_client.py | 220 ------------- ubipop/_utils.py | 117 ++----- 7 files changed, 308 insertions(+), 1205 deletions(-) delete mode 100644 tests/test_pulp.py delete mode 100644 ubipop/_pulp_client.py diff --git a/tests/integration/test_association.py b/tests/integration/test_association.py index e3bc0f8..942e898 100644 --- a/tests/integration/test_association.py +++ b/tests/integration/test_association.py @@ -1,24 +1,22 @@ +import json +import logging import os import subprocess -import logging + import pytest import requests import ubiconfig -import ubipop -import json - -from ubipop._pulp_client import Pulp -from ubipop.ubi_manifest_client.client import Client - from pubtools.pulplib import ( Client, Criteria, Matcher, + ModulemdDefaultsUnit, ModulemdUnit, RpmUnit, - ModulemdDefaultsUnit, ) +import ubipop + PULP_HOSTNAME = os.getenv("TEST_PULP_HOSTNAME") PULP_USER = os.getenv("TEST_PULP_USER") PULP_PWD = os.getenv("TEST_PULP_PWD") @@ -79,37 +77,23 @@ def run_ubipop_tool(content_set, workers=10, dry_run=False): def get_repos_from_cs(cs, skip_dot_version=False): + kwargs = {"verify": PULP_SECURE} if PULP_CERT_PATH is not None: - p = Pulp( - PULP_HOSTNAME, cert=(PULP_CERT_PATH, PULP_KEY_PATH), verify=PULP_SECURE - ) + kwargs["cert"] = (PULP_CERT_PATH, PULP_KEY_PATH) else: - p = Pulp(PULP_HOSTNAME, auth=(PULP_USER, PULP_PWD), verify=PULP_SECURE) - - ret = p.do_request( - "post", - "repositories/search/", - { - "criteria": { - "filters": {"notes.content_set": cs}, - }, - "distributors": False, - }, - ) - - ret.raise_for_status() + kwargs["auth"] = (PULP_USER, PULP_PWD) + p = Client("https://" + PULP_HOSTNAME, **kwargs) - for item in ret.json(): - notes = item["notes"] + repos = p.search_repository(Criteria.with_field("notes.content_set", cs)) - if skip_dot_version and "." in notes["platform_full_version"]: + for repo in repos: + if skip_dot_version and "." in repo.platform_full_version: continue yield { - "id": item["id"], - "name": item["display_name"], - "url": notes["relative_url"], - "arch": notes["arch"], + "id": repo.id, + "url": repo.relative_url, + "arch": repo.arch, } diff --git a/tests/test_pulp.py b/tests/test_pulp.py deleted file mode 100644 index 12a757e..0000000 --- a/tests/test_pulp.py +++ /dev/null @@ -1,387 +0,0 @@ -import threading - -try: - from http.client import HTTPResponse -except ImportError: - from httplib import HTTPResponse - -from io import BytesIO - -import logging -import sys -import pytest -import requests - -from mock import MagicMock, patch -from requests.exceptions import HTTPError - -from pubtools.pulplib import YumRepository -from ubipop import _pulp_client as pulp_client -from ubipop._pulp_client import Pulp, PulpRetryAdapter - -from .conftest import ( - get_rpm_unit, - get_modulemd_defaults_unit, -) - -ORIG_HTTP_TOTAL_RETRIES = pulp_client.HTTP_TOTAL_RETRIES -ORIG_HTTP_RETRY_BACKOFF = pulp_client.HTTP_RETRY_BACKOFF - - -@pytest.fixture(name="mock_pulp") -def fixture_mock_pulp(): - yield Pulp("foo.pulp.com") - - -@pytest.fixture(name="mock_repo") -def fixture_mock_repo(): - yield YumRepository( - id="test_repo", - content_set="test_repo-source-rpms", - ubi_config_version="7.9", - ubi_population=True, - population_sources=["a", "b"], - ) - - -@pytest.fixture(name="mock_package") -def fixture_mock_package(): - yield get_rpm_unit( - name="foo-pkg", - version="10", - release="1", - arch="x86_64", - filename="foo-pkg.rpm", - src_repo_id="src_repo_id", - ) - - -@pytest.fixture(name="mock_mdd") -def fixture_mock_mdd(): - yield get_modulemd_defaults_unit( - name="virt", - stream="rhel", - profiles={"2.6": ["common"]}, - repo_id="src_repo_id", - src_repo_id="src_repo_id", - ) - - -@pytest.fixture(name="mock_response_for_async_req") -def fixture_mock_response_for_async_req(): - yield {"spawned_tasks": [{"task_id": "foo_task_id"}]} - - -@pytest.fixture(name="mock_associate") -def fixture_mock_associate(requests_mock, mock_repo, mock_response_for_async_req): - url = "/pulp/api/v2/repositories/{dst_repo}/actions/associate/".format( - dst_repo=mock_repo.id - ) - yield requests_mock.register_uri("POST", url, json=mock_response_for_async_req) - - -@pytest.fixture(name="mock_unassociate") -def fixture_mock_unassociate(requests_mock, mock_repo, mock_response_for_async_req): - url = "/pulp/api/v2/repositories/{dst_repo}/actions/unassociate/".format( - dst_repo=mock_repo.id - ) - requests_mock.register_uri("POST", url, json=mock_response_for_async_req) - - -@pytest.fixture(name="set_logging") -def fixture_set_logging(): - logger = logging.getLogger("ubipop") - logger.setLevel(logging.INFO) - yield logger - logger.handlers = [] - - -def test_associate_packages(mock_pulp, mock_associate, mock_repo, mock_package): - # pylint: disable=unused-argument - task_ids = mock_pulp.associate_packages(mock_repo, mock_repo, [mock_package]) - assert len(task_ids[0]) - assert task_ids[0] == "foo_task_id" - - -def test_associate_packages_log( - capsys, mock_pulp, mock_associate, set_logging, mock_repo, mock_package -): - # pylint: disable=unused-argument - set_logging.addHandler(logging.StreamHandler(sys.stdout)) - mock_pulp.associate_packages(mock_repo, mock_repo, [mock_package]) - out, _ = capsys.readouterr() - assert ( - out.strip() == "Associating rpm,srpm(foo-pkg.rpm) from test_repo to test_repo" - ) - - -def test_unassociate_packages(mock_pulp, mock_unassociate, mock_repo, mock_package): - # pylint: disable=unused-argument - task_ids = mock_pulp.unassociate_packages(mock_repo, [mock_package]) - assert len(task_ids[0]) - assert task_ids[0] == "foo_task_id" - - -def test_associate_module_defaults(mock_pulp, mock_associate, mock_repo, mock_mdd): - # pylint: disable=unused-argument - task_ids = mock_pulp.associate_module_defaults(mock_repo, mock_repo, [mock_mdd]) - assert task_ids - assert task_ids[0] == "foo_task_id" - - -def test_unassociate_module_defaults(mock_pulp, mock_unassociate, mock_repo, mock_mdd): - # pylint: disable=unused-argument - task_ids = mock_pulp.unassociate_module_defaults(mock_repo, [mock_mdd]) - assert task_ids - assert task_ids[0] == "foo_task_id" - - -def test_unassociate_module_defaults_log( - capsys, mock_pulp, mock_unassociate, set_logging, mock_repo, mock_mdd -): - # pylint: disable=unused-argument - set_logging.addHandler(logging.StreamHandler(sys.stdout)) - mock_pulp.unassociate_module_defaults(mock_repo, [mock_mdd]) - out, _ = capsys.readouterr() - assert out.strip() == "Unassociating modulemd_defaults(virt:rhel) from test_repo" - - -@pytest.fixture(name="search_task_response") -def fixture_search_task_response(): - yield {"state": "finished", "task_id": "test_task"} - - -@pytest.fixture(name="mock_search_task") -def fixture_mock_search_task(requests_mock, search_task_response): - url = "/pulp/api/v2/tasks/{task_id}/".format(task_id="test_task") - requests_mock.register_uri("GET", url, json=search_task_response) - - -def test_wait_for_tasks(mock_pulp, mock_search_task): - # pylint: disable=unused-argument - results = mock_pulp.wait_for_tasks(["test_task"]) - assert len(results) == 1 - assert results["test_task"]["state"] == "finished" - - -@pytest.fixture(name="mocked_getresponse") -def fixture_mocked_getresponse(): - with patch( - "urllib3.connectionpool.HTTPConnectionPool._get_conn" - ) as mocked_get_conn: - yield mocked_get_conn.return_value.getresponse - - -def make_mock_response(status, text): - response_string = ( - "HTTP/1.1 {0} Reason\r\n" "Content-Type: application/json\r\n" "\r\n" "{1}" - ).format(status, text) - - mocked_sock = MagicMock() - mocked_sock.makefile.return_value = BytesIO(response_string.encode()) - - http_response = HTTPResponse(mocked_sock) - http_response.begin() - - return http_response - - -@pytest.mark.parametrize( - "should_retry,err_status_code,env_retries,retry_call,retry_args,ok_response,expected_retries", - [ - # test everything is retryable - ( - True, - 500, - None, - "wait_for_tasks", - (["fake-tid"],), - '{"state":"finished","task_id":"fake-tid"}', - pulp_client.HTTP_TOTAL_RETRIES, - ), - ( - True, - 500, - None, - "search_tasks", - ([MagicMock()],), - "[]", - pulp_client.HTTP_TOTAL_RETRIES, - ), - ( - True, - 500, - None, - "unassociate_units", - ((2 * (MagicMock(),)) + (["rpm"],)), - '{"spawned_tasks":[]}', - pulp_client.HTTP_TOTAL_RETRIES, - ), - ( - True, - 500, - None, - "associate_units", - ( - MagicMock(id="fake_id"), - MagicMock(id="fake_id"), - MagicMock(), - ["rpm"], - ), - '{"spawned_tasks":[]}', - pulp_client.HTTP_TOTAL_RETRIES, - ), - # test custom number of retries - ( - True, - 500, - 3, - "associate_units", - ( - MagicMock(id="fake_id"), - MagicMock(id="fake_id"), - MagicMock(), - ["rpm"], - ), - '{"spawned_tasks":[]}', - 3, - ), - # test 400 status is not retryable - ( - False, - 400, - 3, - "associate_units", - ( - MagicMock(id="fake_id"), - MagicMock(id="fake_id"), - MagicMock(), - ["rpm"], - ), - '{"spawned_tasks":[]}', - 3, - ), - ], -) -def test_retries( - mocked_getresponse, - mock_pulp, - should_retry, - err_status_code, - env_retries, - retry_call, - retry_args, - ok_response, - expected_retries, -): - pulp_client.HTTP_RETRY_BACKOFF = 0 - - try: - if env_retries: - pulp_client.HTTP_TOTAL_RETRIES = env_retries - - retries = [ - make_mock_response(err_status_code, "Fake Http error") - for _ in range(pulp_client.HTTP_TOTAL_RETRIES - 1) - ] - retries.extend([make_mock_response(200, ok_response)]) - mocked_getresponse.side_effect = retries - - if should_retry: - getattr(mock_pulp, retry_call)(*retry_args) - assert len(mocked_getresponse.mock_calls) == expected_retries - else: - with pytest.raises(HTTPError): - getattr(mock_pulp, retry_call)(*retry_args) - finally: - pulp_client.HTTP_TOTAL_RETRIES = ORIG_HTTP_TOTAL_RETRIES - pulp_client.HTTP_RETRY_BACKOFF = ORIG_HTTP_RETRY_BACKOFF - - -@pytest.mark.parametrize( - "method,called", - [ - ("get", True), - ("post", True), - ("put", False), - ("delete", False), - ], -) -def test_do_request(mock_pulp, method, called): - mock_pulp.local.session = MagicMock() - - response = mock_pulp.do_request(method, "/foo/bar") - - handler = getattr(mock_pulp.local.session, method) - - if called: - handler.assert_called_once() - assert response is not None - else: - handler.assert_not_called() - assert response is None - - -@pytest.mark.parametrize( - "auth", - [ - (["/path/file.crt"]), - (["user", "pwd"]), - ], -) -def test_make_session(mock_pulp, auth): - mock_pulp.auth = auth - mock_pulp._make_session() # pylint: disable=protected-access - - assert hasattr(mock_pulp.local, "session") - - session = mock_pulp.local.session - - assert isinstance(session, requests.Session) - assert isinstance(session.get_adapter("http://"), PulpRetryAdapter) - assert isinstance(session.get_adapter("https://"), PulpRetryAdapter) - - -@pytest.mark.parametrize( - "count", - [ - (2), - (5), - (10), - ], -) -def test_session_is_not_shared(mock_pulp, count): - def make_session(sessions): - mock_pulp._make_session() # pylint: disable=protected-access - sessions.append(mock_pulp.local.session) - - threads = [] - sessions = [] - - for _ in range(count): - t = threading.Thread(target=make_session, args=(sessions,)) - threads.append(t) - - for t in threads: - t.start() - - for t in threads: - t.join() - - assert len(sessions) == len(threads) == count - assert len(set(sessions)) == len(threads) - - for session in sessions: - assert isinstance(session, requests.Session) - assert isinstance(session.get_adapter("http://"), PulpRetryAdapter) - assert isinstance(session.get_adapter("https://"), PulpRetryAdapter) - - -def test_insecure(): - with patch("urllib3.disable_warnings") as patched_warnings: - kwargs = { - "auth": ("fake", "user"), - "verify": False, - } - Pulp("foo.host", **kwargs) - patched_warnings.assert_called_once() diff --git a/tests/test_ubipop.py b/tests/test_ubipop.py index beba31a..576c6a6 100644 --- a/tests/test_ubipop.py +++ b/tests/test_ubipop.py @@ -10,7 +10,7 @@ import pytest import ubiconfig -from mock import MagicMock, call, patch +from mock import MagicMock, patch from more_executors import Executors from more_executors.futures import f_proxy, f_return from pubtools.pulplib import ( @@ -32,12 +32,7 @@ UbiRepoSet, ) from ubipop._cdn import Publisher -from ubipop._utils import ( - AssociateActionModuleDefaults, - AssociateActionModules, - UnassociateActionModuleDefaults, - UnassociateActionModules, -) +from ubipop._utils import Association, Unassociation from .conftest import ( get_modulemd_defaults_unit, @@ -624,10 +619,7 @@ def fixture_mock_run_ubi_population(): def test_create_output_file_all_repos( - mock_ubipop_runner, - mock_get_repo_pairs, - mocked_ubiconfig_load_all, - mock_run_ubi_population, + mock_get_repo_pairs, mocked_ubiconfig_load_all, mock_run_ubi_population ): # pylint: disable=unused-argument path = tempfile.mkdtemp("ubipop") @@ -927,14 +919,8 @@ def test_log_pulp_action(capsys, set_logging, mock_ubipop_runner): arch="x86_64", src_repo_id=src_repo.id, ) - associations = [ - AssociateActionModules( - [unit_1], - dst_repo, - [src_repo], - ) - ] - unassociations = [UnassociateActionModules([unit_2], dst_repo)] + associations = [Association([unit_1], ModulemdUnit, dst_repo, [src_repo])] + unassociations = [Unassociation([unit_2], ModulemdUnit, dst_repo)] mock_ubipop_runner.log_pulp_actions(associations, unassociations) out, err = capsys.readouterr() @@ -955,8 +941,8 @@ def test_log_pulp_action_no_actions(capsys, set_logging, mock_ubipop_runner): set_logging.addHandler(logging.StreamHandler(sys.stdout)) src_repo = get_test_repo(id="test_src") dst_repo = get_test_repo(id="test_dst") - associations = [AssociateActionModules([], dst_repo, [src_repo])] - unassociations = [UnassociateActionModules([], dst_repo)] + associations = [Association([], ModulemdUnit, dst_repo, [src_repo])] + unassociations = [Unassociation([], ModulemdUnit, dst_repo)] mock_ubipop_runner.log_pulp_actions(associations, unassociations) out, err = capsys.readouterr() @@ -965,82 +951,13 @@ def test_log_pulp_action_no_actions(capsys, set_logging, mock_ubipop_runner): assert err == "" assert ( assoc_line.strip() - == "No association expected for modules from ['test_src'] to test_dst" - ) - assert unassoc_line.strip() == "No unassociation expected for modules from test_dst" - - -def test_associate_units(mock_ubipop_runner): - src_repo = get_test_repo(id="test_src") - dst_repo = get_test_repo(id="test_dst") - unit = get_modulemd_unit( - name="test_assoc", - stream="fake-stream", - version=1, - context="fake-context", - arch="x86_64", - src_repo_id=src_repo.id, + == "No association expected for ModulemdUnit from ['test_src'] to test_dst" ) - associations = [ - AssociateActionModules([unit], dst_repo, [src_repo]), - ] - - mock_ubipop_runner.pulp_client.associate_modules.return_value = ["task_id"] - ret = mock_ubipop_runner._associate_unassociate_units( - associations - ) # pylint: disable=protected-access - - assert len(ret) == 1 - assert ret[0].result() == ["task_id"] - - -def test_associate_unassociate_md_defaults(mock_ubipop_runner): - src_repo = get_test_repo(id="test_src") - dst_repo = get_test_repo(id="tets_dst") - unit_1 = get_modulemd_defaults_unit( - name="virt", - stream="rhel", - profiles={"2.5": ["common"]}, - repo_id="test_src", - src_repo_id="test_src", - ) - - unit_2 = get_modulemd_defaults_unit( - name="virt", - stream="rhel", - profiles={"2.5": ["unique"]}, - repo_id="test_src", - src_repo_id="test_src", - ) - - associations = AssociateActionModuleDefaults( - [unit_1], - dst_repo, - [src_repo], - ) - - unassociations = UnassociateActionModuleDefaults( - [unit_2], - dst_repo, - ) - - mock_ubipop_runner.pulp_client.unassociate_module_defaults.return_value = [ - "task_id_0" - ] - mock_ubipop_runner.pulp_client.associate_module_defaults.return_value = [ - "task_id_1" - ] - - # pylint: disable=protected-access - mock_ubipop_runner._associate_unassociate_md_defaults( - (associations,), - (unassociations,), + assert ( + unassoc_line.strip() + == "No unassociation expected for ModulemdUnit from test_dst" ) - # the calls has to be in order - calls = [call(["task_id_0"]), call(["task_id_1"])] - mock_ubipop_runner.pulp_client.wait_for_tasks.assert_has_calls(calls) - @pytest.mark.parametrize( "skip_debug_repo", @@ -1190,6 +1107,10 @@ def test_populate_ubi_repos( relative_url="content/unit/4/client", ) + input_binary_repo = YumRepository(id="input_binary") + input_source_repo = YumRepository(id="input_source") + input_debug_repo = YumRepository(id="input_debug") + output_binary_repo = YumRepository( id="ubi_binary", content_set="ubi-8-for-x86_64-appstream-rpms", @@ -1200,10 +1121,6 @@ def test_populate_ubi_repos( distributors=[d1], relative_url="content/unit/1/client", ) - input_binary_repo = YumRepository(id="input_binary") - input_source_repo = YumRepository(id="input_source") - input_debug_repo = YumRepository(id="input_debug") - output_source_repo = YumRepository( id="ubi_source", population_sources=["input_source"], @@ -1284,21 +1201,6 @@ def test_populate_ubi_repos( input_binary_repo, [new_rpm, new_modulemd, new_modulemd_defaults] ) - url = "/pulp/api/v2/repositories/{dst_repo}/actions/associate/".format( - dst_repo="ubi_binary" - ) - - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "foo_task_id"}]} - ) - - url = "/pulp/api/v2/repositories/{dst_repo}/actions/unassociate/".format( - dst_repo="ubi_binary" - ) - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "foo_task_id"}]} - ) - url = "/pulp/api/v2/tasks/{task_id}/".format(task_id="foo_task_id") requests_mock.register_uri( "GET", url, json={"state": "finished", "task_id": "foo_task_id"} @@ -1312,22 +1214,30 @@ def test_populate_ubi_repos( ubi_populate.populate_ubi_repos() history = fake_pulp.publish_history - # there should be 3 repositories succesfully published + # there should be 3 repositories successfully published assert len(history) == 3 - expected_published_repo_ids = set(["ubi_binary", "ubi_debug", "ubi_source"]) - repo_ids_published = set() for publish in history: - assert publish.repository.id in expected_published_repo_ids - repo_ids_published.add(publish.repository.id) - - assert len(publish.tasks) == 1 - assert publish.tasks[0].completed - assert publish.tasks[0].succeeded - - assert repo_ids_published == expected_published_repo_ids - # unfortunately we can't check actual content od repos because - # un/associate calls are using custom client not the pubtools-pulplib Client - # TODO add check for actual content after we move to pubtools-pulplib Client + repo = publish.repository + repo_content = repo.search_content().result().data + + assert repo.id in ["ubi_binary", "ubi_debug", "ubi_source"] + + if repo.id == "ubi_binary": + assert sorted([type(unit).__name__ for unit in repo_content]) == [ + "ModulemdDefaultsUnit", + "ModulemdUnit", + "RpmUnit", + ] + for unit in repo_content: + if isinstance(unit, RpmUnit): + assert unit.filename == new_rpm.filename + if isinstance(unit, ModulemdUnit): + assert unit.nsvca == new_modulemd.nsvca + if isinstance(unit, ModulemdDefaultsUnit): + assert unit.profiles == new_modulemd_defaults.profiles + else: + # ubi_debug and ubi_source should be empty + assert repo_content == [] request = requests_mock.request_history[ -1 @@ -1349,18 +1259,12 @@ def test_populate_ubi_repos( @patch("ubipop._cdn.Publisher") -@patch("ubipop._pulp_client.Pulp.wait_for_tasks") @patch("pubtools.pulplib.YumRepository.get_debug_repository") @patch("pubtools.pulplib.YumRepository.get_source_repository") def test_populate_ubi_repos_no_publish( - get_debug_repository, - get_source_repository, - wait_for_tasks, - publisher, - requests_mock, - monkeypatch, + get_debug_repository, get_source_repository, publisher, requests_mock, monkeypatch ): - """Test run of populate_ubi_repos which checks that correct asssociations and + """Test run of populate_ubi_repos which checks that correct associations and unassociations were made in Pulp but no publish was performed. It's simplified to contain only actions on RPM packages.""" monkeypatch.setenv("UBIPOP_SKIP_PUBLISH", "true") @@ -1485,20 +1389,6 @@ def test_populate_ubi_repos_no_publish( input_binary_repo, [new_rpm, new_modulemd, new_modulemd_defaults] ) - url = "/pulp/api/v2/repositories/{dst_repo}/actions/associate/".format( - dst_repo="ubi_binary" - ) - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "association_task_id"}]} - ) - - url = "/pulp/api/v2/repositories/{dst_repo}/actions/unassociate/".format( - dst_repo="ubi_binary" - ) - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "unassociation_task_id"}]} - ) - url = "/pulp/api/v2/tasks/{task_id}/".format(task_id="association_task_id") requests_mock.register_uri( "GET", url, json={"state": "finished", "task_id": "association_task_id"} @@ -1515,18 +1405,6 @@ def test_populate_ubi_repos_no_publish( ubi_populate.populate_ubi_repos() publisher.assert_not_called() - # there should be 3 associations and 3 unassociations - wait_for_tasks.assert_has_calls( - [ - call(["association_task_id"]), - call(["association_task_id"]), - call(["association_task_id"]), - call(["unassociation_task_id"]), - call(["unassociation_task_id"]), - call(["unassociation_task_id"]), - ], - any_order=True, - ) @patch("ubipop._cdn.Publisher") @@ -1664,21 +1542,6 @@ def test_populate_ubi_repos_dry_run( input_binary_repo, [new_rpm, new_modulemd, new_modulemd_defaults] ) - url = "/pulp/api/v2/repositories/{dst_repo}/actions/associate/".format( - dst_repo="ubi_binary" - ) - - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "foo_task_id"}]} - ) - - url = "/pulp/api/v2/repositories/{dst_repo}/actions/unassociate/".format( - dst_repo="ubi_binary" - ) - requests_mock.register_uri( - "POST", url, json={"spawned_tasks": [{"task_id": "foo_task_id"}]} - ) - url = "/pulp/api/v2/tasks/{task_id}/".format(task_id="foo_task_id") requests_mock.register_uri( "GET", url, json={"state": "finished", "task_id": "foo_task_id"} @@ -1926,9 +1789,8 @@ def _create_cdn_mocks(requests_mock): @patch("ubipop.Client") -@patch("ubipop.Pulp") -def test_make_pulp_client_cert(mocked_client, mocked_legacy_client): - """Tests that pulp client (pubtools and legacy) can be created with cert/key used for auth""" +def test_make_pulp_client_cert(mocked_client): + """Tests that pulp client can be created with cert/key used for auth""" with NamedTemporaryFile() as cert, NamedTemporaryFile() as key: cert_path = os.path.abspath(cert.name) key_path = os.path.abspath(key.name) @@ -1940,15 +1802,11 @@ def test_make_pulp_client_cert(mocked_client, mocked_legacy_client): mocked_client.assert_called_once_with( "https://example.pulp.com", verify=True, cert=auth ) - mocked_legacy_client.assert_called_once_with( - "https://example.pulp.com", verify=True, cert=auth - ) @patch("ubipop.Client") -@patch("ubipop.Pulp") -def test_make_pulp_client_username(mocked_client, mocked_legacy_client): - """Tests that pulp client (pubtools and legacy) can be created with username/password used for auth""" +def test_make_pulp_client_username(mocked_client): + """Tests that pulp client can be created with username/password used for auth""" auth = ("username", "password") ubi_populate = UbiPopulate("example.pulp.com", auth, dry_run=False) @@ -1957,6 +1815,3 @@ def test_make_pulp_client_username(mocked_client, mocked_legacy_client): mocked_client.assert_called_once_with( "https://example.pulp.com", verify=True, auth=auth ) - mocked_legacy_client.assert_called_once_with( - "https://example.pulp.com", verify=True, auth=auth - ) diff --git a/tests/test_utils.py b/tests/test_utils.py index bf6ee31..f4b3ad8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,88 +1,7 @@ -import pytest +from pubtools.pulplib import ModulemdDefaultsUnit -from mock import MagicMock -from pubtools.pulplib import YumRepository, ModulemdDefaultsUnit -from ubipop._utils import ( - AssociateAction, - AssociateActionModuleDefaults, - AssociateActionModules, - AssociateActionRpms, - PulpAction, - UnassociateActionModuleDefaults, - UnassociateActionModules, - UnassociateActionRpms, - flatten_md_defaults_name_profiles, -) from ubipop._matcher import UbiUnit - - -def test_raise_not_implemented_pulp_action(): - units = ["unit1", "unit2"] - repo = YumRepository(id="test") - action = PulpAction(units, repo) - pytest.raises(NotImplementedError, action.get_actions, None) - - -def test_raise_not_implemented_associate_action(): - units = ["unit1", "unit2"] - repo = YumRepository(id="test") - src_repo = YumRepository(id="test") - action = AssociateAction(units, repo, src_repo) - pytest.raises(NotImplementedError, action.get_actions, None) - - -@pytest.mark.parametrize( - "klass, method", - [ - (AssociateActionModules, "associate_modules"), - (AssociateActionModuleDefaults, "associate_module_defaults"), - (AssociateActionRpms, "associate_packages"), - ], -) -def test_get_action_associate(klass, method): - mocked_unit_1 = MagicMock() - mocked_unit_1.associate_source_repo_id = "test_src_1" - mocked_unit_2 = MagicMock() - mocked_unit_2.associate_source_repo_id = "test_src_2" - units = [mocked_unit_1, mocked_unit_2] - dst_repo = YumRepository(id="test_dst") - - src_repos = [ - YumRepository(id="test_src_1"), - YumRepository(id="test_src_2"), - ] - action = klass(units, dst_repo, src_repos) - actions = action.get_actions(MagicMock()) - for action in actions: - associate_action, src_repo_current, dst_repo_current, current_units = action - assert "mock." + method in str(associate_action) - assert len(current_units) == 1 - assert current_units == [ - u for u in units if u.associate_source_repo_id == src_repo_current.id - ] - assert dst_repo_current.id == dst_repo.id - assert src_repo_current.id == current_units[0].associate_source_repo_id - - -@pytest.mark.parametrize( - "klass, method", - [ - (UnassociateActionModules, "unassociate_modules"), - (UnassociateActionModuleDefaults, "unassociate_module_defaults"), - (UnassociateActionRpms, "unassociate_packages"), - ], -) -def test_get_action_unassociate(klass, method): - units = ["unit1", "unit2"] - dst_repo = YumRepository(id="test_dst") - action = klass(units, dst_repo) - associate_action, dst_repo_current, current_units = action.get_actions(MagicMock())[ - 0 - ] - - assert "mock." + method in str(associate_action) - assert current_units == units - assert dst_repo_current.id == dst_repo.id +from ubipop._utils import flatten_md_defaults_name_profiles def test_flatten_md_defaults_name_profiles(): diff --git a/ubipop/__init__.py b/ubipop/__init__.py index 69d9db7..c64f8ec 100644 --- a/ubipop/__init__.py +++ b/ubipop/__init__.py @@ -2,25 +2,21 @@ import os import re from collections import defaultdict, namedtuple -from concurrent.futures import as_completed import attr import ubiconfig from more_executors import Executors from more_executors.futures import f_proxy, f_return -from pubtools.pulplib import Client, Criteria - -from ubipop._pulp_client import Pulp -from ubipop._utils import ( - AssociateActionModuleDefaults, - AssociateActionModules, - AssociateActionRpms, - UnassociateActionModuleDefaults, - UnassociateActionModules, - UnassociateActionRpms, - flatten_md_defaults_name_profiles, +from pubtools.pulplib import ( + Client, + Criteria, + ModulemdDefaultsUnit, + ModulemdUnit, + RpmUnit, ) +from ubipop._utils import Association, Unassociation, flatten_md_defaults_name_profiles + from ._cdn import Publisher from ._matcher import Matcher from .ubi_manifest_client.client import Client as UbimClient @@ -32,10 +28,6 @@ class RepoMissing(Exception): pass -class ConfigMissing(Exception): - pass - - RepoSet = namedtuple("RepoSet", ["rpm", "source", "debug"]) @@ -111,10 +103,6 @@ def __init__( output_repos_file=None, **kwargs ): - # legacy client implemeted in this repo, it's expected to be replaced by pubtools.pulplib.Client - self.legacy_pulp_client = self._make_pulp_client( - pulp_hostname, pulp_auth, verify, Pulp - ) self._pulp_hostname = pulp_hostname self._pulp_auth = pulp_auth self._verify = verify @@ -129,8 +117,8 @@ def __init__( self._repo_ids = kwargs.get("repo_ids", None) self._version = kwargs.get("version", None) self._content_set_regex = kwargs.get("content_set_regex", None) - self._ubiconfig_map = None self._ubi_manifest_url = kwargs.get("ubi_manifest_url") or None + self._action_batch_size = kwargs.get("action_batch_size", 100) arl_templates = os.getenv("UBIPOP_ARL_TEMPLATES", "") self._publisher_args = { "edgerc": os.getenv("UBIPOP_EDGERC_CFG", "/etc/.edgerc"), @@ -150,22 +138,16 @@ def __init__( @property def pulp_client(self): if self._pulp_client is None: - self._pulp_client = self._make_pulp_client( - self._pulp_hostname, self._pulp_auth, self._verify, Client - ) + kwargs = {"verify": self._verify} + if os.path.isfile(self._pulp_auth[0]) and os.path.isfile( + self._pulp_auth[1] + ): + kwargs["cert"] = self._pulp_auth + else: + kwargs["auth"] = self._pulp_auth + self._pulp_client = Client("https://" + self._pulp_hostname, **kwargs) return self._pulp_client - def _make_pulp_client(self, url, auth, verify, client_klass): - kwargs = { - "verify": verify, - } - if os.path.isfile(auth[0]) and os.path.isfile(auth[1]): - kwargs["cert"] = auth - else: - kwargs["auth"] = auth - - return client_klass("https://" + url, **kwargs) - @property def ubiconfig_list(self): if self._ubiconfig_list is None: @@ -355,12 +337,13 @@ def _run_ubi_population( for repo_sets in repo_sets_list: for repo_set in repo_sets: UbiPopulateRunner( - self.legacy_pulp_client, + self.pulp_client, repo_set, self.dry_run, self._executor, ubim_client, publisher, + self._action_batch_size, ).run_ubi_population() out_repos.update(repo_set.get_output_repos()) @@ -375,6 +358,7 @@ def __init__( executor, ubi_manifest_client=None, publisher=None, + action_batch_size=100, ): self.pulp_client = pulp_client self.ubim_client = ubi_manifest_client @@ -382,28 +366,105 @@ def __init__( self.dry_run = dry_run self._executor = executor self._publisher = publisher + self._action_batch_size = action_batch_size - def _determine_pulp_actions(self, units, current, diff_f): - expected = list(units) - to_associate = diff_f(expected, current) - to_unassociate = diff_f(current, expected) - return to_associate, to_unassociate + def run_ubi_population(self): + current_content = self._get_current_content() - def _get_pulp_actions_mds(self, modules, current): - return self._determine_pulp_actions( - modules, current, self._diff_modules_by_nsvca + # start async querying for modulemds and modular and non-modular packages + binary_manifest = self.ubim_client.get_manifest(self.repo_set.out_repos.rpm.id) + debug_manifest = self.ubim_client.get_manifest(self.repo_set.out_repos.debug.id) + source_manifest = self.ubim_client.get_manifest( + self.repo_set.out_repos.source.id ) - def _get_pulp_actions_md_defaults(self, module_defaults, current): - return self._determine_pulp_actions( - module_defaults, - current, - self._diff_md_defaults_by_profiles, + self.repo_set.modules = binary_manifest.modules + self.repo_set.module_defaults = self._search_expected_modulemd_defaults( + binary_manifest.modulemd_defaults ) + self.repo_set.packages = binary_manifest.packages + self.repo_set.debug_rpms = debug_manifest.packages + self.repo_set.source_rpms = source_manifest.packages - def _get_pulp_actions_pkgs(self, pkgs, current): - return self._determine_pulp_actions( - pkgs, current, self._diff_packages_by_filename + ( + associations, + unassociations, + mdd_association, + mdd_unassociation, + ) = self._get_pulp_actions(current_content) + + if self.dry_run: + self.log_curent_content(current_content) + self.log_pulp_actions( + associations + (mdd_association,), + unassociations + (mdd_unassociation,), + ) + else: + self._do_associations(associations) + self._do_unassociations(unassociations) + + self._do_unassociations([mdd_unassociation]) + self._do_associations([mdd_association]) + + if self._publisher: + self._publish_out_repos() + + def _get_current_content(self): + """ + Gather current content of output repos + """ + criteria = [Criteria.true()] + current_modulemds = f_proxy( + self._executor.submit( + Matcher.search_modulemds, criteria, [self.repo_set.out_repos.rpm] + ) + ) + current_modulemd_defaults = f_proxy( + self._executor.submit( + Matcher.search_modulemd_defaults, + criteria, + [self.repo_set.out_repos.rpm], + ) + ) + current_rpms = f_proxy( + self._executor.submit( + Matcher.search_rpms, criteria, [self.repo_set.out_repos.rpm] + ) + ) + current_srpms = f_proxy( + self._executor.submit( + Matcher.search_srpms, criteria, [self.repo_set.out_repos.source] + ) + ) + + if self.repo_set.out_repos.debug.result(): + current_debug_rpms = f_proxy( + self._executor.submit( + Matcher.search_rpms, criteria, [self.repo_set.out_repos.debug] + ) + ) + else: + current_debug_rpms = f_proxy(f_return([])) + + current_content = RepoContent( + current_rpms, + current_srpms, + current_debug_rpms, + current_modulemds, + current_modulemd_defaults, + ) + return current_content + + def _search_expected_modulemd_defaults(self, modulemd_defaults): + criteria_values = [(unit.name,) for unit in modulemd_defaults] + fields = ("name",) + or_criteria = Matcher.create_or_criteria(fields, criteria_values) + return f_proxy( + self._executor.submit( + Matcher.search_modulemd_defaults, + or_criteria, + self.repo_set.in_repos.rpm, + ) ) def _get_pulp_actions(self, current_content): @@ -438,39 +499,74 @@ def _get_pulp_actions(self, current_content): ) associations = ( - AssociateActionModules( - modules_assoc, self.repo_set.out_repos.rpm, self.repo_set.in_repos.rpm + Association( + modules_assoc, + ModulemdUnit, + self.repo_set.out_repos.rpm, + self.repo_set.in_repos.rpm, ), - AssociateActionRpms( - rpms_assoc, self.repo_set.out_repos.rpm, self.repo_set.in_repos.rpm + Association( + rpms_assoc, + RpmUnit, + self.repo_set.out_repos.rpm, + self.repo_set.in_repos.rpm, ), - AssociateActionRpms( + Association( srpms_assoc, + RpmUnit, self.repo_set.out_repos.source, self.repo_set.in_repos.source, ), - AssociateActionRpms( - debug_assoc, self.repo_set.out_repos.debug, self.repo_set.in_repos.debug + Association( + debug_assoc, + RpmUnit, + self.repo_set.out_repos.debug, + self.repo_set.in_repos.debug, ), ) unassociations = ( - UnassociateActionModules(modules_unassoc, self.repo_set.out_repos.rpm), - UnassociateActionRpms(rpms_unassoc, self.repo_set.out_repos.rpm), - UnassociateActionRpms(srpms_unassoc, self.repo_set.out_repos.source), - UnassociateActionRpms(debug_unassoc, self.repo_set.out_repos.debug), + Unassociation(modules_unassoc, ModulemdUnit, self.repo_set.out_repos.rpm), + Unassociation(rpms_unassoc, RpmUnit, self.repo_set.out_repos.rpm), + Unassociation(srpms_unassoc, RpmUnit, self.repo_set.out_repos.source), + Unassociation(debug_unassoc, RpmUnit, self.repo_set.out_repos.debug), ) - mdd_association = AssociateActionModuleDefaults( - md_defaults_assoc, self.repo_set.out_repos.rpm, self.repo_set.in_repos.rpm + mdd_association = Association( + md_defaults_assoc, + ModulemdDefaultsUnit, + self.repo_set.out_repos.rpm, + self.repo_set.in_repos.rpm, ) - - mdd_unassociation = UnassociateActionModuleDefaults( - md_defaults_unassoc, self.repo_set.out_repos.rpm + mdd_unassociation = Unassociation( + md_defaults_unassoc, ModulemdDefaultsUnit, self.repo_set.out_repos.rpm ) return associations, unassociations, mdd_association, mdd_unassociation + def _get_pulp_actions_mds(self, modules, current): + return self._determine_pulp_actions( + modules, current, self._diff_modules_by_nsvca + ) + + def _get_pulp_actions_md_defaults(self, module_defaults, current): + return self._determine_pulp_actions( + module_defaults, + current, + self._diff_md_defaults_by_profiles, + ) + + def _get_pulp_actions_pkgs(self, pkgs, current): + return self._determine_pulp_actions( + pkgs, current, self._diff_packages_by_filename + ) + + def _determine_pulp_actions(self, units, current, diff_f): + expected = list(units) + to_associate = diff_f(expected, current) + to_unassociate = diff_f(current, expected) + return to_associate, to_unassociate + def _diff_modules_by_nsvca(self, modules_1, modules_2): return self._diff_lists_by_attr(modules_1, modules_2, "nsvca") @@ -493,91 +589,6 @@ def diff_attr(obj): return diff - def _search_expected_modulemd_defaults(self, modulemd_defaults): - criteria_values = [(unit.name,) for unit in modulemd_defaults] - fields = ("name",) - or_criteria = Matcher.create_or_criteria(fields, criteria_values) - return f_proxy( - self._executor.submit( - Matcher.search_modulemd_defaults, - or_criteria, - self.repo_set.in_repos.rpm, - ) - ) - - def run_ubi_population(self): - current_content = self._get_current_content() - - # start async querying for modulemds and modular and non-modular packages - binary_manifest = self.ubim_client.get_manifest(self.repo_set.out_repos.rpm.id) - debug_manifest = self.ubim_client.get_manifest(self.repo_set.out_repos.debug.id) - source_manifest = self.ubim_client.get_manifest( - self.repo_set.out_repos.source.id - ) - self.repo_set.modules = binary_manifest.modules - self.repo_set.module_defaults = self._search_expected_modulemd_defaults( - binary_manifest.modulemd_defaults - ) - self.repo_set.packages = binary_manifest.packages - self.repo_set.debug_rpms = debug_manifest.packages - self.repo_set.source_rpms = source_manifest.packages - - ( - associations, - unassociations, - mdd_association, - mdd_unassociation, - ) = self._get_pulp_actions(current_content) - - if self.dry_run: - self.log_curent_content(current_content) - self.log_pulp_actions( - associations + (mdd_association,), - unassociations + (mdd_unassociation,), - ) - else: - fts = [] - fts.extend(self._associate_unassociate_units(associations + unassociations)) - # wait for associate/unassociate tasks - self._wait_pulp(fts) - - self._associate_unassociate_md_defaults( - (mdd_association,), (mdd_unassociation,) - ) - if self._publisher: - self._publish_out_repos() - - def _associate_unassociate_units(self, action_list): - fts = [] - for action in action_list: - if action.units: - fts.extend( - [ - self._executor.submit(*a) - for a in action.get_actions(self.pulp_client) - ] - ) - - return fts - - def _associate_unassociate_md_defaults(self, action_md_ass, action_md_unass): - """ - Unassociate old module defaults units first, wait until done and - then start new units association - """ - fts_unass = self._associate_unassociate_units(action_md_unass) - self._wait_pulp(fts_unass) - - fts_ass = self._associate_unassociate_units(action_md_ass) - self._wait_pulp(fts_ass) - - def _wait_pulp(self, futures): - # wait for pulp tasks from futures - for ft in as_completed(futures): - tasks = ft.result() - if tasks: - self.pulp_client.wait_for_tasks(tasks) - def log_curent_content(self, current_content): _LOG.info("Current modules in repo: %s", self.repo_set.out_repos.rpm.id) for module in current_content.modules: @@ -614,7 +625,7 @@ def log_pulp_actions(self, associations, unassociations): else: _LOG.info( "No association expected for %s from %s to %s", - item.TYPE, + item.unit_type.__name__, [r.id for r in item.src_repos], item.dst_repo.id, ) @@ -626,55 +637,73 @@ def log_pulp_actions(self, associations, unassociations): else: _LOG.info( "No unassociation expected for %s from %s", - item.TYPE, + item.unit_type.__name__, item.dst_repo.id, ) - def _get_current_content(self): - """ - Gather current content of output repos - """ - criteria = [Criteria.true()] - current_modulemds = f_proxy( - self._executor.submit( - Matcher.search_modulemds, criteria, [self.repo_set.out_repos.rpm] - ) - ) - current_modulemd_defaults = f_proxy( - self._executor.submit( - Matcher.search_modulemd_defaults, - criteria, - [self.repo_set.out_repos.rpm], - ) - ) - current_rpms = f_proxy( - self._executor.submit( - Matcher.search_rpms, criteria, [self.repo_set.out_repos.rpm] - ) - ) - current_srpms = f_proxy( - self._executor.submit( - Matcher.search_srpms, criteria, [self.repo_set.out_repos.source] - ) - ) - - if self.repo_set.out_repos.debug.result(): - current_debug_rpms = f_proxy( - self._executor.submit( - Matcher.search_rpms, criteria, [self.repo_set.out_repos.debug] + def _do_associations(self, associations): + association_fts = [] + for a in associations: + for src_repo, units in a.src_repo_to_unit_map.items(): + if units: + src_obj = self.pulp_client.get_repository(a.get_repo(src_repo).id) + dst_obj = self.pulp_client.get_repository(a.dst_repo.id) + for chunk in list(self._batch_units(units)): + criteria = self._criteria_for_units(chunk, a.unit_type) + association_fts.append( + self.pulp_client.copy_content(src_obj, dst_obj, criteria) + ) + results = [] + for ft in association_fts: + results.extend(ft.result()) + return results + + def _do_unassociations(self, unassociations): + unassociation_fts = [] + for u in unassociations: + if u.units: + dst_repo_obj = self.pulp_client.get_repository(u.dst_repo.id) + for chunk in list(self._batch_units(u.units)): + criteria = self._criteria_for_units(chunk, u.unit_type) + unassociation_fts.append(dst_repo_obj.remove_content(criteria)) + results = [] + for ft in unassociation_fts: + results.extend(ft.result()) + return results + + def _batch_units(self, units): + for i in range(0, len(units), self._action_batch_size): + yield units[i : i + self._action_batch_size] + + def _criteria_for_units(self, units, unit_type): + partial_crit = [] + for unit in units: + if unit_type is RpmUnit: + partial_crit.append(Criteria.with_field("filename", unit.filename)) + if unit_type is ModulemdUnit: + md_crit = [] + nsvca_dict = { + "name": unit.name, + "stream": unit.stream, + "version": unit.version, + "context": unit.context, + "arch": unit.arch, + } + for md_part, value in nsvca_dict.items(): + md_crit.append(Criteria.with_field(md_part, value)) + partial_crit.append(Criteria.and_(*md_crit)) + if unit_type is ModulemdDefaultsUnit: + partial_crit.append( + Criteria.and_( + Criteria.with_field("name", unit.name), + Criteria.with_field("stream", unit.stream), + ) ) - ) - else: - current_debug_rpms = f_proxy(f_return([])) - current_content = RepoContent( - current_rpms, - current_srpms, - current_debug_rpms, - current_modulemds, - current_modulemd_defaults, - ) - return current_content + if partial_crit: + return Criteria.and_( + Criteria.with_unit_type(unit_type), Criteria.or_(*partial_crit) + ) def _publish_out_repos(self): to_publish = [] diff --git a/ubipop/_pulp_client.py b/ubipop/_pulp_client.py deleted file mode 100644 index 11dcf8d..0000000 --- a/ubipop/_pulp_client.py +++ /dev/null @@ -1,220 +0,0 @@ -import logging -import os -import threading -import time - -from urllib3.util.retry import Retry - -try: - from urllib.parse import urljoin -except ImportError: - from urlparse import urljoin - -import requests - -_LOG = logging.getLogger("ubipop") - -HTTP_TOTAL_RETRIES = int(os.environ.get("UBIPOP_HTTP_TOTAL_RETRIES", 10)) -HTTP_RETRY_BACKOFF = float(os.environ.get("UBIPOP_HTTP_RETRY_BACKOFF", 1)) -HTTP_TIMEOUT = int(os.environ.get("UBIPOP_HTTP_TIMEOUT", 120)) - - -class UnsupportedTypeId(Exception): - pass - - -class PulpRetryAdapter(requests.adapters.HTTPAdapter): - def __init__(self, *args, **kwargs): - kwargs["max_retries"] = Retry( - total=kwargs.get("total_retries", HTTP_TOTAL_RETRIES), - status_forcelist=[500, 502, 503, 504], - method_whitelist=[ - "HEAD", - "TRACE", - "GET", - "POST", - "PUT", - "OPTIONS", - "DELETE", - ], - backoff_factor=kwargs.get("backoff_factor", HTTP_RETRY_BACKOFF), - ) - super(PulpRetryAdapter, self).__init__(*args, **kwargs) - - -class Pulp(object): - PULP_API = "/pulp/api/v2/" - - def __init__(self, hostname, **kwargs): - self.hostname = hostname - self.scheme = "https://" if not hostname.startswith("https://") else "" - self.base_url = urljoin(self.scheme + hostname, self.PULP_API) - self.local = threading.local() - - self._session_kwargs = {} - - if kwargs.get("verify") is False: - import urllib3 - - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - for arg in ( - "auth", - "cert", - "verify", - ): - if arg in kwargs: - self._session_kwargs[arg] = kwargs.pop(arg) - - def _make_session(self): - adapter = PulpRetryAdapter() - session = requests.Session() - session.mount("http://", adapter) - session.mount("https://", adapter) - - for key, value in self._session_kwargs.items(): - setattr(session, key, value) - - self.local.session = session - - def do_request(self, req_type, url, data=None): - if not hasattr(self.local, "session"): - self._make_session() - - req_url = urljoin(self.base_url, url) - - if req_type == "post": - ret = self.local.session.post(req_url, json=data, timeout=HTTP_TIMEOUT) - elif req_type == "get": - ret = self.local.session.get(req_url, timeout=HTTP_TIMEOUT) - else: - ret = None - - return ret - - def wait_for_tasks(self, task_id_list, delay=5.0): - results = {} - - _tasks = set(task_id_list) - while _tasks: - statuses = self.search_tasks(_tasks) - for status in statuses: - if status["state"] in ("finished", "error", "canceled"): - _tasks -= set([status["task_id"]]) - results[status["task_id"]] = status - if _tasks: - time.sleep(delay) - return results - - def search_tasks(self, task_ids): - url = "tasks/{task_id}/" - statuses = [] - for task_id in task_ids: - ret = self.do_request("get", url.format(task_id=task_id)) - statuses.append(ret.json()) - return statuses - - def _modules_query(self, modules): - query_list = [] - for module in modules: - query_list.append( - { - "$and": [ - {"name": module.name}, - {"context": module.context}, - {"version": module.version}, - {"stream": module.stream}, - {"arch": module.arch}, - ] - } - ) - - return query_list - - def _module_defaults_query(self, module_defaults): - query_list = [] - for md_d in module_defaults: - query_list.append({"$and": [{"name": md_d.name}, {"stream": md_d.stream}]}) - return query_list - - def _rpms_query(self, rpms): - return [{"filename": rpm.filename} for rpm in rpms] - - def _filter_item_log(self, type_ids, unit): - log_message = "" - if "rpm" in type_ids or "srpm" in type_ids: - log_message = unit.filename - elif "modulemd" in type_ids: - log_message = unit.nsvca - elif "modulemd_defaults" in type_ids: - log_message = ":".join((unit.name, unit.stream)) - return log_message - - def unassociate_units(self, repo, units, type_ids): - url = "repositories/{dst_repo}/actions/unassociate/".format(dst_repo=repo.id) - data = { - "criteria": { - "type_ids": list(type_ids), - "filters": {"unit": {"$or": self._get_query_list(type_ids, units)}}, - }, - } - log_msg = "Unassociating %s(%s) from %s" - for unit in units: - info = self._filter_item_log(type_ids, unit) - _LOG.info(log_msg, ",".join(type_ids), info, repo.id) - ret = self.do_request("post", url, data).json() - return [task["task_id"] for task in ret["spawned_tasks"]] - - def associate_units(self, src_repo, dest_repo, units, type_ids): - url = "repositories/{dst_repo}/actions/associate/".format(dst_repo=dest_repo.id) - data = { - "source_repo_id": src_repo.id, - "criteria": { - "type_ids": list(type_ids), - "filters": { - "unit": { - "$or": self._get_query_list(type_ids, units), - }, - }, - }, - } - log_msg = "Associating %s(%s) from %s to %s" - for unit in units: - info = self._filter_item_log(type_ids, unit) - _LOG.info(log_msg, ",".join(type_ids), info, src_repo.id, dest_repo.id) - ret = self.do_request("post", url, data) - ret.raise_for_status() - ret_json = ret.json() - return [task["task_id"] for task in ret_json["spawned_tasks"]] - - def _get_query_list(self, type_ids, units): - if "modulemd" in type_ids: - query_list = self._modules_query(units) - elif "modulemd_defaults" in type_ids: - query_list = self._module_defaults_query(units) - elif "rpm" in type_ids or "srpm" in type_ids: - query_list = self._rpms_query(units) - else: - raise UnsupportedTypeId - - return query_list - - def associate_modules(self, src_repo, dst_repo, modules): - return self.associate_units(src_repo, dst_repo, modules, ("modulemd",)) - - def associate_module_defaults(self, src_repo, dst_repo, module_defaults): - return self.associate_units( - src_repo, dst_repo, module_defaults, ("modulemd_defaults",) - ) - - def associate_packages(self, src_repo, dst_repo, rpms): - return self.associate_units(src_repo, dst_repo, rpms, ("rpm", "srpm")) - - def unassociate_modules(self, repo, modules): - return self.unassociate_units(repo, modules, ("modulemd",)) - - def unassociate_module_defaults(self, repo, module_defaults): - return self.unassociate_units(repo, module_defaults, ("modulemd_defaults",)) - - def unassociate_packages(self, repo, rpms): - return self.unassociate_units(repo, rpms, ("rpm", "srpm")) diff --git a/ubipop/_utils.py b/ubipop/_utils.py index ed238c4..769fc83 100644 --- a/ubipop/_utils.py +++ b/ubipop/_utils.py @@ -1,108 +1,31 @@ -class PulpAction(object): - def __init__(self, units, dst_repo): +class Association(object): + def __init__(self, units, unit_type, dst_repo, src_repos): self.units = units + self.unit_type = unit_type self.dst_repo = dst_repo - - def get_actions(self, pulp_client_inst): - raise NotImplementedError - - -class AssociateAction(PulpAction): - def __init__(self, units, dst_repo, src_repos): - super(AssociateAction, self).__init__(units, dst_repo) self.src_repos = src_repos - - def _map_src_repo_to_unit(self): - src_repo_unit_map = {} - for unit in self.units: - src_repo_unit_map.setdefault(unit.associate_source_repo_id, []).append(unit) - - return src_repo_unit_map - - def _get_repo_obj(self, repo_id): + self._src_repo_to_unit_map = None + + @property + def src_repo_to_unit_map(self): + if self._src_repo_to_unit_map is None: + mapping = {} + for unit in self.units: + mapping.setdefault(unit.associate_source_repo_id, []).append(unit) + self._src_repo_to_unit_map = mapping + return self._src_repo_to_unit_map + + def get_repo(self, repo_id): for repo in self.src_repos: if repo_id == repo.id: return repo - def get_actions(self, pulp_client_inst): - raise NotImplementedError - - -class AssociateActionModules(AssociateAction): - TYPE = "modules" - - def get_actions(self, pulp_client_inst): - actions = [] - for src_repo_id, units in self._map_src_repo_to_unit().items(): - actions.append( - ( - pulp_client_inst.associate_modules, - self._get_repo_obj(src_repo_id), - self.dst_repo, - units, - ) - ) - - return actions - - -class UnassociateActionModules(PulpAction): - TYPE = "modules" - - def get_actions(self, pulp_client_inst): - return [(pulp_client_inst.unassociate_modules, self.dst_repo, self.units)] - -class AssociateActionModuleDefaults(AssociateAction): - TYPE = "module_defaults" - - def get_actions(self, pulp_client_inst): - actions = [] - for src_repo_id, units in self._map_src_repo_to_unit().items(): - actions.append( - ( - pulp_client_inst.associate_module_defaults, - self._get_repo_obj(src_repo_id), - self.dst_repo, - units, - ) - ) - - return actions - - -class UnassociateActionModuleDefaults(PulpAction): - TYPE = "module_defaults" - - def get_actions(self, pulp_client_inst): - return [ - (pulp_client_inst.unassociate_module_defaults, self.dst_repo, self.units) - ] - - -class AssociateActionRpms(AssociateAction): - TYPE = "packages" - - def get_actions(self, pulp_client_inst): - actions = [] - for src_repo_id, units in self._map_src_repo_to_unit().items(): - actions.append( - ( - pulp_client_inst.associate_packages, - self._get_repo_obj(src_repo_id), - self.dst_repo, - units, - ) - ) - - return actions - - -class UnassociateActionRpms(PulpAction): - TYPE = "packages" - - def get_actions(self, pulp_client_inst): - return [(pulp_client_inst.unassociate_packages, self.dst_repo, self.units)] +class Unassociation(object): + def __init__(self, units, unit_type, dst_repo): + self.units = units + self.unit_type = unit_type + self.dst_repo = dst_repo def flatten_md_defaults_name_profiles(obj):