Skip to content

Commit

Permalink
Add cooldown time for failed RemoteArtifact fetch
Browse files Browse the repository at this point in the history
On a request for on-demand content in the content app, a corrupted Remote
that contains the wrong binary (for that content) prevented other Remotes
from being attempted on future requests.

Now the last failed Remotes are temporarily ignored and others may be picked.

Cherry-picked from: 0a5ac4a
  • Loading branch information
pedro-psb committed Jan 16, 2025
1 parent 1f5d2ce commit 62ff400
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 46 deletions.
4 changes: 4 additions & 0 deletions CHANGES/5725.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
On a request for on-demand content in the content app, a corrupted Remote that
contains the wrong binary (for that content) prevented other Remotes from being
attempted on future requests. Now the last failed Remotes are temporarily ignored
and others may be picked.
24 changes: 20 additions & 4 deletions pulp_file/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def file_fixtures_root(tmp_path):

@pytest.fixture
def write_3_iso_file_fixture_data_factory(file_fixtures_root):
def _write_3_iso_file_fixture_data_factory(name, overwrite=False):
def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None):
file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite)
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"))
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"))
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"))
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed)
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed)
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed)
generate_manifest(
file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3]
)
Expand Down Expand Up @@ -364,3 +364,19 @@ def _wget_recursive_download_on_host(url, destination):
)

return _wget_recursive_download_on_host


@pytest.fixture
def generate_server_and_remote(
file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup
):
def _generate_server_and_remote(*, manifest_path, policy):
server = gen_fixture_server(file_fixtures_root, None)
url = server.make_url(manifest_path)
remote = gen_object_with_cleanup(
file_bindings.RemotesFileApi,
{"name": str(uuid.uuid4()), "url": str(url), "policy": policy},
)
return server, remote

yield _generate_server_and_remote
16 changes: 0 additions & 16 deletions pulp_file/tests/functional/api/test_acs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@
)


@pytest.fixture
def generate_server_and_remote(
file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup
):
def _generate_server_and_remote(*, manifest_path, policy):
server = gen_fixture_server(file_fixtures_root, None)
url = server.make_url(manifest_path)
remote = gen_object_with_cleanup(
file_bindings.RemotesFileApi,
{"name": str(uuid.uuid4()), "url": str(url), "policy": policy},
)
return server, remote

yield _generate_server_and_remote


@pytest.mark.parallel
def test_acs_validation_and_update(
file_bindings,
Expand Down
18 changes: 18 additions & 0 deletions pulpcore/app/migrations/0126_remoteartifact_failed_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2024-11-27 15:06

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more'),
]

operations = [
migrations.AddField(
model_name='remoteartifact',
name='failed_at',
field=models.DateTimeField(null=True),
),
]
2 changes: 2 additions & 0 deletions pulpcore/app/models/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ class RemoteArtifact(BaseModel, QueryMixin):
sha256 (models.CharField): The expected SHA-256 checksum of the file.
sha384 (models.CharField): The expected SHA-384 checksum of the file.
sha512 (models.CharField): The expected SHA-512 checksum of the file.
failed_at (models.DateTimeField): The datetime of last download attempt failure.
Relations:
Expand All @@ -721,6 +722,7 @@ class RemoteArtifact(BaseModel, QueryMixin):
sha256 = models.CharField(max_length=64, null=True, db_index=True)
sha384 = models.CharField(max_length=96, null=True, db_index=True)
sha512 = models.CharField(max_length=128, null=True, db_index=True)
failed_at = models.DateTimeField(null=True)

content_artifact = models.ForeignKey(ContentArtifact, on_delete=models.CASCADE)
remote = models.ForeignKey("Remote", on_delete=models.CASCADE)
Expand Down
5 changes: 5 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@
"EXPIRES_TTL": 600, # 10 minutes
}

# The time a RemoteArtifact will be ignored after failure.
# In on-demand, if a fetching content from a remote failed due to corrupt data,
# the corresponding RemoteArtifact will be ignored for that time (seconds).
REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes

SPECTACULAR_SETTINGS = {
"SERVE_URLCONF": ROOT_URLCONF,
"DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator",
Expand Down
35 changes: 24 additions & 11 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import struct
from gettext import gettext as _
from functools import lru_cache
from datetime import timedelta

from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError
from aiohttp.web import FileResponse, StreamResponse, HTTPOk
Expand All @@ -23,6 +24,7 @@
from asgiref.sync import sync_to_async

import django
from django.utils import timezone

from opentelemetry import metrics

Expand Down Expand Up @@ -842,22 +844,25 @@ async def _stream_content_artifact(self, request, response, content_artifact):
[pulpcore.plugin.models.ContentArtifact][] returned the binary data needed for
the client.
"""
# We should only retry with exceptions that happen before we receive any data
# We should only skip exceptions that happen before we receive any data
# and start streaming, as we can't rollback data if something happens after that.
RETRYABLE_EXCEPTIONS = (
SKIPPABLE_EXCEPTIONS = (
ClientResponseError,
UnsupportedDigestValidationError,
ClientConnectionError,
)

remote_artifacts = content_artifact.remoteartifact_set.select_related(
"remote"
).order_by_acs()
protection_time = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN
remote_artifacts = (
content_artifact.remoteartifact_set.select_related("remote")
.order_by_acs()
.exclude(failed_at__gte=timezone.now() - timedelta(seconds=protection_time))
)
async for remote_artifact in remote_artifacts:
try:
response = await self._stream_remote_artifact(request, response, remote_artifact)
return response
except RETRYABLE_EXCEPTIONS as e:
except SKIPPABLE_EXCEPTIONS as e:
log.warning(
"Could not download remote artifact at '{}': {}".format(
remote_artifact.url, str(e)
Expand Down Expand Up @@ -1161,14 +1166,22 @@ async def finalize():
try:
download_result = await downloader.run()
except DigestValidationError:
remote_artifact.failed_at = timezone.now()
await remote_artifact.asave()
await downloader.session.close()
close_tcp_connection(request.transport._sock)
REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN
raise RuntimeError(
f"We tried streaming {remote_artifact.url!r} to the client, but it "
"failed checkusm validation. "
"At this point, we cant recover from wrong data already sent, "
"so we are forcing the connection to close. "
"If this error persists, the remote server might be corrupted."
f"Pulp tried streaming {remote_artifact.url!r} to "
"the client, but it failed checksum validation.\n\n"
"We can't recover from wrong data already sent so we are:\n"
"- Forcing the connection to close.\n"
"- Marking this Remote to be ignored for "
f"{REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN=}s.\n\n"
"If the Remote is known to be fixed, try resyncing the associated repository.\n"
"If the Remote is known to be permanently corrupted, try removing "
"affected Pulp Remote, adding a good one and resyncing.\n"
"If the problem persists, please contact the Pulp team."
)

if content_length := response.headers.get("Content-Length"):
Expand Down
121 changes: 108 additions & 13 deletions pulpcore/tests/functional/api/using_plugin/test_content_delivery.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Tests related to content delivery."""

from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError
import hashlib
import pytest
import subprocess
import uuid
from urllib.parse import urljoin

from pulpcore.client.pulp_file import (
RepositorySyncURL,
)
import pytest
from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError

from pulpcore.client.pulp_file import RepositorySyncURL
from pulpcore.tests.functional.utils import download_file, get_files_in_manifest


Expand Down Expand Up @@ -116,8 +115,13 @@ def test_remote_content_changed_with_on_demand(
):
"""
GIVEN a remote synced on demand with fileA (e.g, digest=123),
WHEN on the remote server, fileA changed its content (e.g, digest=456),
THEN retrieving fileA from the content app will cause a connection-close/incomplete-response.
AND the remote server, fileA changed its content (e.g, digest=456),
WHEN the client first requests that content
THEN the content app will start a response but close the connection before finishing
WHEN the client requests that content again (within the RA cooldown interval)
THEN the content app will return a 404
"""
# GIVEN
basic_manifest_path = write_3_iso_file_fixture_data_factory("basic")
Expand All @@ -129,17 +133,108 @@ def test_remote_content_changed_with_on_demand(
repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href)
distribution = file_distribution_factory(repository=repo.pulp_href)
expected_file_list = list(get_files_in_manifest(remote.url))

# WHEN
write_3_iso_file_fixture_data_factory("basic", overwrite=True)

# THEN
get_url = urljoin(distribution.base_url, expected_file_list[0][0])
with pytest.raises(ClientPayloadError, match="Response payload is not completed"):
download_file(get_url)

# Assert again with curl just to be sure.
# WHEN (first request)
result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# THEN
assert result.returncode == 18
assert b"* Closing connection 0" in result.stderr
assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr

# WHEN (second request)
result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# THEN
assert result.returncode == 0
assert b"< HTTP/1.1 404 Not Found" in result.stderr


@pytest.mark.parallel
def test_handling_remote_artifact_on_demand_streaming_failure(
write_3_iso_file_fixture_data_factory,
file_repo_with_auto_publish,
file_remote_factory,
file_bindings,
monitor_task,
monitor_task_group,
file_distribution_factory,
gen_object_with_cleanup,
generate_server_and_remote,
):
"""
GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS).
AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted
WHEN a client requests the content for the first time
THEN the client doesnt get any content
WHEN a client requests the content for the second time
THEN the client gets the right content
"""

# Plumbing
def create_simple_remote(manifest_path):
remote = file_remote_factory(manifest_path=manifest_path, policy="on_demand")
body = RepositorySyncURL(remote=remote.pulp_href)
monitor_task(
file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task
)
return remote

def create_acs_remote(manifest_path):
acs_server, acs_remote = generate_server_and_remote(
manifest_path=manifest_path, policy="on_demand"
)
acs = gen_object_with_cleanup(
file_bindings.AcsFileApi,
{"remote": acs_remote.pulp_href, "paths": [], "name": str(uuid.uuid4())},
)
monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group)
return acs

def sync_publish_and_distribute(remote):
body = RepositorySyncURL(remote=remote.pulp_href)
monitor_task(
file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task
)
repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href)
distribution = file_distribution_factory(repository=repo.pulp_href)
return distribution

def refresh_acs(acs):
monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group)
return acs

def get_original_content_info(remote):
expected_files = get_files_in_manifest(remote.url)
content_unit = list(expected_files)[0]
return content_unit[0], content_unit[1]

def download_from_distribution(content, distribution):
content_unit_url = urljoin(distribution.base_url, content_name)
downloaded_file = download_file(content_unit_url)
actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest()
return actual_checksum

# GIVEN
basic_manifest_path = write_3_iso_file_fixture_data_factory("basic", seed=123)
acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123)
remote = create_simple_remote(basic_manifest_path)
distribution = sync_publish_and_distribute(remote)
acs = create_acs_remote(acs_manifest_path)
refresh_acs(acs)
write_3_iso_file_fixture_data_factory("acs", overwrite=True) # corrupt

# WHEN/THEN (first request)
content_name, expected_checksum = get_original_content_info(remote)

with pytest.raises(ClientPayloadError, match="Response payload is not completed"):
download_from_distribution(content_name, distribution)

# WHEN/THEN (second request)
actual_checksum = download_from_distribution(content_name, distribution)
assert actual_checksum == expected_checksum
9 changes: 7 additions & 2 deletions pulpcore/tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import hashlib
import os
import random

from aiohttp import web
from dataclasses import dataclass
Expand Down Expand Up @@ -103,10 +104,14 @@ async def _download_file(url, auth=None, headers=None):
return MockDownload(body=await response.read(), response_obj=response)


def generate_iso(full_path, size=1024, relative_path=None):
def generate_iso(full_path, size=1024, relative_path=None, seed=None):
"""Generate a random file."""
with open(full_path, "wb") as fout:
contents = os.urandom(size)
if seed:
random.seed(seed)
contents = random.randbytes(size)
else:
contents = os.urandom(size)
fout.write(contents)
fout.flush()
digest = hashlib.sha256(contents).hexdigest()
Expand Down

0 comments on commit 62ff400

Please sign in to comment.