Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR #6064/0a5ac4aa backport][3.63] [SAT-29018] Fix/corrupted RA blocks content streaming #6161

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/scripts/post_before_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -euv

# # See pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND for context.
# This needs to be set here because it relies on service init.
# Its being tested in only one scenario to have both cases covered.
if [[ "$TEST" == "s3" ]]; then
cmd_prefix pulpcore-manager backport-patch-6064
fi

8 changes: 8 additions & 0 deletions CHANGES/5725.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
On a request for on-demand content in the content app, a corrupted Remote that
contains the wrong binary (for that content) prevented other Remotes from being
attempted on future requests. Now the last failed Remotes are temporarily ignored
and others may be picked.

Because the [original](https://github.com/pulp/pulpcore/pull/6064) contains a migraton,
this is backported here as an optional patch which can be enabled by running the
pulpcore-manager command: `backport-patch-6064`.
24 changes: 20 additions & 4 deletions pulp_file/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def file_fixtures_root(tmp_path):

@pytest.fixture
def write_3_iso_file_fixture_data_factory(file_fixtures_root):
def _write_3_iso_file_fixture_data_factory(name, overwrite=False):
def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None):
file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite)
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"))
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"))
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"))
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed)
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed)
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed)
generate_manifest(
file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3]
)
Expand Down Expand Up @@ -362,3 +362,19 @@ def _wget_recursive_download_on_host(url, destination):
)

return _wget_recursive_download_on_host


@pytest.fixture
def generate_server_and_remote(
file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup
):
def _generate_server_and_remote(*, manifest_path, policy):
server = gen_fixture_server(file_fixtures_root, None)
url = server.make_url(manifest_path)
remote = gen_object_with_cleanup(
file_bindings.RemotesFileApi,
{"name": str(uuid.uuid4()), "url": str(url), "policy": policy},
)
return server, remote

yield _generate_server_and_remote
16 changes: 0 additions & 16 deletions pulp_file/tests/functional/api/test_acs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,6 @@
)


@pytest.fixture
def generate_server_and_remote(
file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup
):
def _generate_server_and_remote(*, manifest_path, policy):
server = gen_fixture_server(file_fixtures_root, None)
url = server.make_url(manifest_path)
remote = gen_object_with_cleanup(
file_bindings.RemotesFileApi,
{"name": str(uuid.uuid4()), "url": str(url), "policy": policy},
)
return server, remote

yield _generate_server_and_remote


@pytest.mark.parallel
def test_acs_validation_and_update(
file_bindings,
Expand Down
16 changes: 16 additions & 0 deletions pulpcore/app/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ def ready(self):
sender=self,
dispatch_uid="populate_artifact_serving_distribution_identifier",
)
post_migrate.connect(
_conditional_6064_backport_workaround,
sender=self,
dispatch_uid="conditional_6064_backport_workaround",
)


def _populate_access_policies(sender, apps, verbosity, **kwargs):
Expand Down Expand Up @@ -416,3 +421,14 @@ def _populate_artifact_serving_distribution(sender, apps, verbosity, **kwargs):
pulp_type="core.artifact",
defaults={"base_path": name, "content_guard": content_guard},
)


def _conditional_6064_backport_workaround(sender, apps, verbosity, **kwargs):
# See context in pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND
from pulpcore.app.models import RemoteArtifact
from django.db import models
import pulpcore.app.util

if pulpcore.app.util.failed_at_exists(connection, RemoteArtifact):
pulpcore.app.util.ENABLE_6064_BACKPORT_WORKAROUND = True
RemoteArtifact.add_to_class("failed_at", models.DateTimeField(null=True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does this do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about the implementation, but the effect is like adding the field dynamically.
For example, django will be able to use the filter RemoteArtifact.objects.exclude(failed_at__gte=Y). If the field really exist in the database, it succeeds, otherwise it raises a ProgrammingError.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And something in this PR is altering the actual db table, so this can be used?
That feels like reinventing the whole db migrations framework with out the safeguards. A subsequent upgrade is then probably going to fail. When we said "You cannot backport a migration.", that meant you cannot add db altering code to a release branch assuming that all db alteration would be done by a migration in the django framework. My gut feeling is this is way too dangerous.
Can you think of a solution that does not require changing the db schema? We should be lucky by the fact that this is kind of ephemeral data.

  • Would it help to keep a per-worker list (maybe a bloom filter) in memory?
  • Can we repurpose another datetime field that we don't rely on there?
  • Can we use redis?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A subsequent upgrade is then probably going to fail.

My assumption was that a field addition (that doesn't have any other couplings) would be safe. But I can see this is a sensitive area. I'll explore those alternatives.

(I had though of per-worker cache, but concluded it would be simpler to use the db - before knowing about the backport problem).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for understanding my concerns.
At this point I think postgres may even reject to apply the migration on top of this out of bounds change.
If I could choose, i'd prefer the per worker in memory caching solution. Even if it would only solve the problem half way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the idea of repurposing another field, there is pulp_created and pulp_last_updated.
But I'm afraid of unexpected side-effects, like pulp_last_updated being updated by something and cooling down a good remote.
Or something else (thus, unexpected), because those are in the system for so long.

pulp_created         | timestamp with time zone |           | not null |
pulp_last_updated    | timestamp with time zone |           |          |

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not about a Remote, but the RemoteArtifact, right? I'm not so concerned as this class is only used internally and never visible to the user. I highly doubt that we have any logic depending on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, RAs.
Well, I'll open a PR. If nothing bad happens during tests I think we are good.
Upgrading with the workaround of using another field name worked, but this would be preferable.

77 changes: 77 additions & 0 deletions pulpcore/app/management/commands/backport-patch-6064.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from django.core.management.base import BaseCommand
from gettext import gettext as _
from django.db import connection
from pulpcore.app.models import RemoteArtifact


CHECK_COL_QUERY = """
SELECT COUNT(*)
FROM information_schema.columns
WHERE table_name = %s
AND column_name = %s;
"""

MODIFY_QUERY_TMPL = """
ALTER TABLE {}
ADD COLUMN {} TIMESTAMPTZ DEFAULT NULL;
"""

HELP = _(
"""
Enables patch backport of #6064 (https://github.com/pulp/pulpcore/pull/6064).

The fix prevents corrupted remotes from making content unreacahble by adding
a cooldown time, which is tracked by a new field, 'RemoteArtifact.failed_at'.
This command adds the field to the appropriate table.
"""
)


class Command(BaseCommand):
help = HELP

def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Run the migration in dry-run mode without saving changes",
)

def handle(self, *args, **options):
dry_run = options.get("dry_run", False)
try:
with connection.cursor() as cursor:
# Check if column already exists
table_name = RemoteArtifact._meta.db_table
field_name = "failed_at"
cursor.execute(CHECK_COL_QUERY, [table_name, field_name])
field_exists = cursor.fetchone()[0] > 0
if field_exists:
self._print_success(f"Field '{table_name}.{field_name}' already exists.")
self._print_success("Nothing to be done")
return

# Add field to table
self._print_info(f"Adding {field_name!r} column to {table_name!r}...")
MODIFY_QUERY = MODIFY_QUERY_TMPL.format(table_name, field_name)
if not dry_run:
cursor.execute(MODIFY_QUERY)
self._print_success("Done")
else:
self._print_warn("[DRY-RUN] SQL that would be executed:")
self._print_info(MODIFY_QUERY)
except Exception as e:
self._print_error(f"Migration failed: {str(e)}")
raise

def _print_info(self, msg):
self.stdout.write(msg)

def _print_success(self, msg):
self.stdout.write(self.style.SUCCESS(msg))

def _print_error(self, msg):
self.stdout.write(self.style.ERROR(msg))

def _print_warn(self, msg):
self.stdout.write(self.style.WARNING(msg))
1 change: 1 addition & 0 deletions pulpcore/app/models/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ class RemoteArtifact(BaseModel, QueryMixin):
sha256 (models.CharField): The expected SHA-256 checksum of the file.
sha384 (models.CharField): The expected SHA-384 checksum of the file.
sha512 (models.CharField): The expected SHA-512 checksum of the file.
failed_at (models.DateTimeField): The datetime of last download attempt failure.

Relations:

Expand Down
5 changes: 5 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@
"EXPIRES_TTL": 600, # 10 minutes
}

# The time a RemoteArtifact will be ignored after failure.
# In on-demand, if a fetching content from a remote failed due to corrupt data,
# the corresponding RemoteArtifact will be ignored for that time (seconds).
REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes

SPECTACULAR_SETTINGS = {
"SERVE_URLCONF": ROOT_URLCONF,
"DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator",
Expand Down
22 changes: 22 additions & 0 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@
from pulpcore.exceptions.validation import InvalidSignatureError


# Backport workaround for https://github.com/pulp/pulpcore/pull/6064
# If 'pulpcore-manager backport-patch-6064' was run, the field
# RemoteArtifact.failed_at will be set and the backport will take effect.
ENABLE_6064_BACKPORT_WORKAROUND = False


def failed_at_exists(connection, ra_class) -> bool:
"""Whtether 'failed_at' exists in the database."""
table_name = ra_class._meta.db_table
field_name = "failed_at"
CHECK_COL_QUERY = """
SELECT COUNT(*)
FROM information_schema.columns
WHERE table_name = %s
AND column_name = %s;
"""
with connection.cursor() as cursor:
cursor.execute(CHECK_COL_QUERY, [table_name, field_name])
field_exists = cursor.fetchone()[0] > 0
return field_exists


# a little cache so viewset_for_model doesn't have to iterate over every app every time
_model_viewset_cache = {}
STRIPPED_API_ROOT = settings.API_ROOT.strip("/")
Expand Down
40 changes: 32 additions & 8 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import struct
from gettext import gettext as _
from functools import lru_cache
from datetime import timedelta

from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError
from aiohttp.web import FileResponse, StreamResponse, HTTPOk
Expand All @@ -23,6 +24,7 @@
from asgiref.sync import sync_to_async

import django
from django.utils import timezone

from opentelemetry import metrics

Expand Down Expand Up @@ -58,6 +60,7 @@
MetricsEmitter,
get_domain,
cache_key,
ENABLE_6064_BACKPORT_WORKAROUND,
)

from pulpcore.exceptions import ( # noqa: E402
Expand Down Expand Up @@ -842,9 +845,9 @@ async def _stream_content_artifact(self, request, response, content_artifact):
[pulpcore.plugin.models.ContentArtifact][] returned the binary data needed for
the client.
"""
# We should only retry with exceptions that happen before we receive any data
# We should only skip exceptions that happen before we receive any data
# and start streaming, as we can't rollback data if something happens after that.
RETRYABLE_EXCEPTIONS = (
SKIPPABLE_EXCEPTIONS = (
ClientResponseError,
UnsupportedDigestValidationError,
ClientConnectionError,
Expand All @@ -853,11 +856,17 @@ async def _stream_content_artifact(self, request, response, content_artifact):
remote_artifacts = content_artifact.remoteartifact_set.select_related(
"remote"
).order_by_acs()

if ENABLE_6064_BACKPORT_WORKAROUND:
protection_time = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN
remote_artifacts = remote_artifacts.exclude(
failed_at__gte=timezone.now() - timedelta(seconds=protection_time)
)
async for remote_artifact in remote_artifacts:
try:
response = await self._stream_remote_artifact(request, response, remote_artifact)
return response
except RETRYABLE_EXCEPTIONS as e:
except SKIPPABLE_EXCEPTIONS as e:
log.warning(
"Could not download remote artifact at '{}': {}".format(
remote_artifact.url, str(e)
Expand Down Expand Up @@ -1161,14 +1170,29 @@ async def finalize():
try:
download_result = await downloader.run()
except DigestValidationError:
COOLDOWN_MSG = ""
if ENABLE_6064_BACKPORT_WORKAROUND:
remote_artifact.failed_at = timezone.now()
await remote_artifact.asave()
REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = (
settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN
)
COOLDOWN_MSG = (
"- Marking this Remote to be ignored for "
f"{REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN=}s.\n\n"
)
await downloader.session.close()
close_tcp_connection(request.transport._sock)
raise RuntimeError(
f"We tried streaming {remote_artifact.url!r} to the client, but it "
"failed checkusm validation. "
"At this point, we cant recover from wrong data already sent, "
"so we are forcing the connection to close. "
"If this error persists, the remote server might be corrupted."
f"Pulp tried streaming {remote_artifact.url!r} to "
"the client, but it failed checksum validation.\n\n"
"We can't recover from wrong data already sent so we are:\n"
"- Forcing the connection to close.\n"
f"{COOLDOWN_MSG}"
"If the Remote is known to be fixed, try resyncing the associated repository.\n"
"If the Remote is known to be permanently corrupted, try removing "
"affected Pulp Remote, adding a good one and resyncing.\n"
"If the problem persists, please contact the Pulp team."
)

if content_length := response.headers.get("Content-Length"):
Expand Down
Loading
Loading