Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/+dataless_manifests.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Readded backwards compatibility for manifests that haven't been migrated to new data field. Sync will try to update them to the new format and repair manifests that are missing their artifact, but it is recommended that you run the `pulpcore-manager container-handle-image-data` command to fix all manifests at once. The command has been updated to report broken manifests by repository. Broken manifests in pushed repositories need to be deleted and re-pushed.
8 changes: 7 additions & 1 deletion pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from pulpcore.plugin.download import DownloaderFactory, HttpDownloader

from pulp_container.constants import V2_ACCEPT_HEADERS

log = getLogger(__name__)

HeadResult = namedtuple(
Expand Down Expand Up @@ -51,7 +53,11 @@ async def _run(self, handle_401=True, extra_data=None):
handle_401(bool): If true, catch 401, request a new token and retry.

"""
headers = {}
# manifests are header sensitive, blobs do not care
# these accept headers are going to be sent with every request to ensure downloader
# can download manifests, namely in the repair core task
# FIXME this can be rolledback after https://github.com/pulp/pulp_container/issues/1288
headers = V2_ACCEPT_HEADERS
repo_name = None
if extra_data is not None:
headers = extra_data.get("headers", headers)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from contextlib import suppress
from collections import defaultdict
from gettext import gettext as _
from json.decoder import JSONDecodeError

Expand All @@ -8,6 +8,7 @@
from django.db.models import Q

from pulpcore.plugin.cache import SyncContentCache
from pulpcore.plugin.util import get_url

from pulp_container.app.models import ContainerDistribution, Manifest
from pulp_container.app.utils import get_content_data
Expand All @@ -34,6 +35,7 @@ class Command(BaseCommand):

def handle(self, *args, **options):
manifests_updated_count = 0
self._broken_manifests = []

manifests_v1 = Manifest.objects.filter(
Q(media_type=MEDIA_TYPE.MANIFEST_V1),
Expand Down Expand Up @@ -68,6 +70,23 @@ def handle(self, *args, **options):
self.style.SUCCESS("Successfully updated %d manifests." % manifests_updated_count)
)

if self._broken_manifests:
self.stdout.write(
self.style.WARNING("Found %d broken manifests." % len(self._broken_manifests))
)
broken_by_repo = defaultdict(list)
for manifest in self._broken_manifests:
repos = manifest.repositories.all()
if repos:
for repo in repos:
broken_by_repo[get_url(repo)].append(get_url(manifest))
else:
broken_by_repo["orphaned"].append(get_url(manifest))
for repo_url, manifests in broken_by_repo.items():
self.stdout.write(self.style.WARNING(" %s" % repo_url))
for manifest_url in manifests:
self.stdout.write(self.style.WARNING(" %s" % manifest_url))
Comment thread
dralley marked this conversation as resolved.

if settings.CACHE_ENABLED and manifests_updated_count != 0:
base_paths = ContainerDistribution.objects.values_list("base_path", flat=True)
if base_paths:
Expand All @@ -91,11 +110,13 @@ def update_manifests(self, manifests_qs):
]

for manifest in manifests_qs.iterator():
# suppress non-existing/already migrated artifacts and corrupted JSON files
with suppress(ObjectDoesNotExist, JSONDecodeError):
try:
needs_update = self.init_manifest(manifest)
if needs_update:
manifests_to_update.append(manifest)
except (ObjectDoesNotExist, JSONDecodeError):
self._broken_manifests.append(manifest)
continue
if needs_update:
manifests_to_update.append(manifest)

if len(manifests_to_update) > 1000:
manifests_qs.model.objects.bulk_update(
Expand Down
42 changes: 42 additions & 0 deletions pulp_container/app/redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.http import Http404
from django.shortcuts import redirect

from pulp_container.app.exceptions import ManifestNotFound
Expand Down Expand Up @@ -101,6 +102,47 @@ def redirect_to_object_storage(self, artifact, return_media_type):
)
return redirect(content_url)

# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifests
def redirect_to_artifact(self, content_name, manifest, manifest_media_type):
"""
Search for the passed manifest's artifact and issue a redirect.
"""
try:
artifact = manifest._artifacts.get()
except ObjectDoesNotExist:
raise Http404(f"An artifact for '{content_name}' was not found")

return self.redirect_to_object_storage(artifact, manifest_media_type)

def issue_tag_redirect(self, tag):
"""
Issue a redirect if an accepted media type requires it or return not found if manifest
version is not supported.
"""
if tag.tagged_manifest.data:
return super().issue_tag_redirect(tag)

manifest_media_type = tag.tagged_manifest.media_type
if manifest_media_type == MEDIA_TYPE.MANIFEST_V1:
return self.redirect_to_artifact(
tag.name, tag.tagged_manifest, MEDIA_TYPE.MANIFEST_V1_SIGNED
)
elif manifest_media_type in get_accepted_media_types(self.request.headers):
return self.redirect_to_artifact(tag.name, tag.tagged_manifest, manifest_media_type)
else:
raise ManifestNotFound(reference=tag.name)

def issue_manifest_redirect(self, manifest):
"""
Directly redirect to an associated manifest's artifact.
"""
if manifest.data:
return super().issue_manifest_redirect(manifest)

return self.redirect_to_artifact(manifest.digest, manifest, manifest.media_type)

# END OF BACKWARD COMPATIBILITY


class AzureStorageRedirects(S3StorageRedirects):
"""
Expand Down
44 changes: 44 additions & 0 deletions pulp_container/app/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ async def get_tag(self, request):
"Content-Type": return_media_type,
"Docker-Content-Digest": tag.tagged_manifest.digest,
}
# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
if not tag.tagged_manifest.data:
return await self.dispatch_tag(request, tag, response_headers)
# END OF BACKWARD COMPATIBILITY
return web.Response(body=tag.tagged_manifest.data, headers=response_headers)

# return what was found in case media_type is accepted header (docker, oci)
Expand All @@ -206,11 +210,41 @@ async def get_tag(self, request):
"Content-Type": return_media_type,
"Docker-Content-Digest": tag.tagged_manifest.digest,
}
# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
if not tag.tagged_manifest.data:
return await self.dispatch_tag(request, tag, response_headers)
# END OF BACKWARD COMPATIBILITY
return web.Response(body=tag.tagged_manifest.data, headers=response_headers)

# return 404 in case the client is requesting docker manifest v2 schema 1
raise PathNotResolved(tag_name)

# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
async def dispatch_tag(self, request, tag, response_headers):
"""
Finds an artifact associated with a Tag and sends it to the client, otherwise tries
to stream it.

Args:
request(:class:`~aiohttp.web.Request`): The request to prepare a response for.
tag: Tag
response_headers (dict): dictionary that contains the 'Content-Type' header to send
with the response

Returns:
:class:`aiohttp.web.StreamResponse` or :class:`aiohttp.web.FileResponse`: The response
streamed back to the client.

"""
try:
artifact = await tag.tagged_manifest._artifacts.aget()
except ObjectDoesNotExist:
ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all())
return await self._stream_content_artifact(request, web.StreamResponse(), ca)
else:
return await Registry._dispatch(artifact, response_headers)
# END OF BACKWARD COMPATIBILITY

@RegistryContentCache(
base_key=lambda req, cac: Registry.find_base_path_cached(req, cac),
auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk),
Expand Down Expand Up @@ -247,6 +281,16 @@ async def get_by_digest(self, request):
"Content-Type": manifest.media_type,
"Docker-Content-Digest": manifest.digest,
}
# TODO: BACKWARD COMPATIBILITY - remove after migrating to artifactless manifest
if not manifest.data:
if saved_artifact := await manifest._artifacts.afirst():
return await Registry._dispatch(saved_artifact, headers)
else:
ca = await sync_to_async(lambda x: x[0])(manifest.contentartifact_set.all())
return await self._stream_content_artifact(
request, web.StreamResponse(), ca
)
# END OF BACKWARD COMPATIBILITY
return web.Response(body=manifest.data, headers=headers)
elif content_type == "blobs":
ca = await ContentArtifact.objects.select_related("artifact", "content").aget(
Expand Down
22 changes: 18 additions & 4 deletions pulp_container/app/tasks/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hashlib

from aiofiles import tempfile
from asgiref.sync import sync_to_async
from django.conf import settings
from django.db.models import Q

Expand Down Expand Up @@ -101,10 +102,23 @@ async def create_signature(manifest, reference, signing_service):
"""
async with semaphore:
# download and write file for object storage
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(manifest.data.encode("utf-8"))
await tf.flush()
manifest_path = tf.name
if not manifest.data:
# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Fully migrating" at this point should include making the manifest.data field not null, so the migration actually fails if it's not yet done.

Copy link
Copy Markdown
Contributor

@dralley dralley Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

artifact = await manifest._artifacts.aget()
if settings.STORAGES["default"]["BACKEND"] != "pulpcore.app.models.storage.FileSystem":
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(await sync_to_async(artifact.file.read)())
await tf.flush()
artifact.file.close()
manifest_path = tf.name
else:
manifest_path = artifact.file.path
# END OF BACKWARD COMPATIBILITY
else:
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(manifest.data.encode("utf-8"))
await tf.flush()
manifest_path = tf.name

async with tempfile.NamedTemporaryFile(dir=".", prefix="signature") as tf:
sig_path = tf.name
Expand Down
66 changes: 53 additions & 13 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
determine_media_type,
extract_data_from_signature,
filter_resources,
get_content_data,
urlpath_sanitize,
validate_manifest,
)
Expand Down Expand Up @@ -83,11 +84,28 @@ async def _check_for_existing_manifest(self, download_tag):

digest = response.headers.get("docker-content-digest")

if manifest := await Manifest.objects.filter(
digest=digest, pulp_domain=get_domain()
).afirst():
raw_text_data = manifest.data
content_data = json.loads(raw_text_data)
if (
manifest := await Manifest.objects.prefetch_related("contentartifact_set")
.filter(digest=digest, pulp_domain=get_domain())
.afirst()
):
if raw_text_data := manifest.data:
content_data = json.loads(raw_text_data)

# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
elif saved_artifact := await manifest._artifacts.afirst():
content_data, raw_bytes_data = await sync_to_async(get_content_data)(saved_artifact)
raw_text_data = raw_bytes_data.decode("utf-8")
# if artifact is not available (due to reclaim space) we will download it again
else:
content_data, raw_text_data, response = await self._download_manifest_data(
response.url
)
if manifest.data is None:
manifest.data = raw_text_data
await manifest.asave(update_fields=["data"])
Copy link
Copy Markdown
Contributor

@dralley dralley Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider this a low risk compare to the existing issues, but in theory since we are mutating content, it could potentially deadlock between two parallel syncs.

Not a blocker, just a note.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well hopefully after running the command, this piece won't be executed often. Also since manifests are per image and each image gets its own repo, it's unlikely to have parallel syncs of the same two images going at the same time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

# END OF BACKWARD COMPATIBILITY

else:
content_data, raw_text_data, response = await self._download_manifest_data(response.url)

Expand Down Expand Up @@ -364,7 +382,9 @@ async def get_paginated_tag_list(self, rel_link, repo_name):
while True:
link = urljoin(self.remote.url, rel_link)
list_downloader = self.remote.get_downloader(url=link)
await list_downloader.run(extra_data={"repo_name": repo_name})
# FIXME this can be rolledback after https://github.com/pulp/pulp_container/issues/1288
# tags/list endpoint does not like any unnecessary headers to be sent
await list_downloader.run(extra_data={"repo_name": repo_name, "headers": {}})
with open(list_downloader.path) as tags_raw:
tags_dict = json.loads(tags_raw.read())
tag_list.extend(tags_dict["tags"])
Expand Down Expand Up @@ -507,12 +527,30 @@ async def create_listed_manifest(self, manifest_data):
)
manifest_url = urljoin(self.remote.url, relative_url)

if manifest := await Manifest.objects.filter(
digest=digest, pulp_domain=get_domain()
).afirst():
content_data = json.loads(manifest.data)

content_data, manifest = await self._download_and_instantiate_manifest(manifest_url, digest)
if (
manifest := await Manifest.objects.prefetch_related("contentartifact_set")
.filter(digest=digest, pulp_domain=get_domain())
.afirst()
):
if manifest.data:
content_data = json.loads(manifest.data)
# TODO: BACKWARD COMPATIBILITY - remove after fully migrating to artifactless manifest
elif saved_artifact := await manifest._artifacts.afirst():
content_data, raw_bytes_data = await sync_to_async(get_content_data)(saved_artifact)
manifest.data = raw_bytes_data.decode("utf-8")
await manifest.asave(update_fields=["data"])
# if artifact is not available (due to reclaim space) we will download it again
else:
content_data, new_manifest = await self._download_and_instantiate_manifest(
Copy link
Copy Markdown
Contributor

@dralley dralley Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before, we were taking the returned manifest value directly (which AFAICT is an unsaved model object) and throwing away the previous saved manifest. Presumably later on the QueryExistingContents stage would swap the model back, unless it's actually meaningfully different. And so substantively it's only doing a download of content_data

And in the new version, we are taking an existing manifest from above, overwriting the data field, and then re-saving. And so it's still mostly about the download, but instead of the manifest getting swapped with the original later we're just updating the original now and passing it along.

Right? This is subtle, not sure if I have it correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right, we don't have the artifact so we need to download the data and update the manifest.

manifest_url, digest
)
manifest.data = new_manifest.data
await manifest.asave(update_fields=["data"])
# END OF BACKWARD COMPATIBILITY
else:
content_data, manifest = await self._download_and_instantiate_manifest(
manifest_url, digest
)

# in oci-index spec, platform is an optional field
platform = manifest_data.get("platform", None)
Expand Down Expand Up @@ -603,7 +641,9 @@ async def create_signatures(self, man_dc, signature_source):
man_dc.content.digest,
)
signatures_downloader = self.remote.get_downloader(url=signatures_url)
await signatures_downloader.run()
# FIXME this can be rolledback after https://github.com/pulp/pulp_container/issues/1288
# signature extensions endpoint does not like any unnecessary headers to be sent
await signatures_downloader.run(extra_data={"headers": {}})
with open(signatures_downloader.path) as signatures_fd:
api_extension_signatures = json.loads(signatures_fd.read())
for signature in api_extension_signatures.get("signatures", []):
Expand Down