Skip to content

Commit

Permalink
Manifest is now artifactless
Browse files Browse the repository at this point in the history
Objects of the Manifest class no longer store the manifest data (raw binary json data) in a
linked artifact. Instead, a new TextField called 'data' was added to the Manifest class itself.

closes #1288
  • Loading branch information
MichalPysik committed Apr 2, 2024
1 parent ef0a194 commit f3e4279
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 232 deletions.
1 change: 1 addition & 0 deletions CHANGES/1288.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Manifest model is now artifactless, the manifest data is stored in a new 'data' field instead.
39 changes: 39 additions & 0 deletions pulp_container/app/migrations/0039_manifest_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Generated by Django 4.2.10 on 2024-03-05 11:22

from django.db import migrations, models


def move_manifest_data(apps, schema_editor):
"""
Move the data from the manifest's linked file to the new data field.
"""
Manifest = apps.get_model("container", "Manifest")

for manifest in Manifest.objects.all():
artifact = manifest._artifacts.first()
if not artifact:
continue

file = artifact.file
json_data_text = file.read()
file.close()

manifest.data = json_data_text
manifest._artifacts.clear()
manifest.save()


class Migration(migrations.Migration):

dependencies = [
('container', '0038_add_manifest_metadata_fields'),
]

operations = [
migrations.AddField(
model_name='manifest',
name='data',
field=models.TextField(default=''),
),
migrations.RunPython(move_manifest_data),
]
2 changes: 2 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Manifest(Content):
digest (models.TextField): The manifest digest.
schema_version (models.IntegerField): The manifest schema version.
media_type (models.TextField): The manifest media type.
data (models.TextField): The manifest's data in text format.
annotations (models.JSONField): Metadata stored inside the image manifest.
labels (models.JSONField): Metadata stored inside the image configuration.
is_bootable (models.BooleanField): Indicates whether the image is bootable or not.
Expand All @@ -98,6 +99,7 @@ class Manifest(Content):
digest = models.TextField(db_index=True)
schema_version = models.IntegerField()
media_type = models.TextField(choices=MANIFEST_CHOICES)
data = models.TextField(default="")

annotations = models.JSONField(default=dict)
labels = models.JSONField(default=dict)
Expand Down
53 changes: 10 additions & 43 deletions pulp_container/app/redirects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.http import Http404
from django.shortcuts import redirect

from pulp_container.app.exceptions import ManifestNotFound
Expand Down Expand Up @@ -29,11 +28,11 @@ def redirect_to_content_app(self, content_type, content_id):
f"{settings.CONTENT_ORIGIN}/pulp/container/{self.path}/{content_type}/{content_id}"
)


class FileStorageRedirects(CommonRedirects):
"""
A class which contains methods used for redirecting to the default django's file storage.
"""
def issue_manifest_redirect(self, manifest):
"""
Issue a redirect for the passed manifest.
"""
return self.redirect_to_content_app("manifests", manifest.digest)

def issue_tag_redirect(self, tag):
"""
Expand All @@ -48,11 +47,11 @@ def issue_tag_redirect(self, tag):

return self.redirect_to_content_app("manifests", tag.name)

def issue_manifest_redirect(self, manifest):
"""
Issue a redirect for the passed manifest.
"""
return self.redirect_to_content_app("manifests", manifest.digest)

class FileStorageRedirects(CommonRedirects):
"""
A class which contains methods used for redirecting to the default django's file storage.
"""

def issue_blob_redirect(self, blob):
"""
Expand All @@ -66,38 +65,6 @@ class S3StorageRedirects(CommonRedirects):
A class that implements methods for the direct retrieval of manifest objects.
"""

def issue_tag_redirect(self, tag):
"""
Issue a redirect if an accepted media type requires it or return not found if manifest
version is not supported.
"""
manifest_media_type = tag.tagged_manifest.media_type
if manifest_media_type == MEDIA_TYPE.MANIFEST_V1:
return self.redirect_to_artifact(
tag.name, tag.tagged_manifest, MEDIA_TYPE.MANIFEST_V1_SIGNED
)
elif manifest_media_type in get_accepted_media_types(self.request.headers):
return self.redirect_to_artifact(tag.name, tag.tagged_manifest, manifest_media_type)
else:
raise ManifestNotFound(reference=tag.name)

def issue_manifest_redirect(self, manifest):
"""
Directly redirect to an associated manifest's artifact.
"""
return self.redirect_to_artifact(manifest.digest, manifest, manifest.media_type)

def redirect_to_artifact(self, content_name, manifest, manifest_media_type):
"""
Search for the passed manifest's artifact and issue a redirect.
"""
try:
artifact = manifest._artifacts.get()
except ObjectDoesNotExist:
raise Http404(f"An artifact for '{content_name}' was not found")

return self.redirect_to_object_storage(artifact, manifest_media_type)

def issue_blob_redirect(self, blob):
"""
Redirect to the passed blob or stream content when an associated artifact is not present.
Expand Down
106 changes: 47 additions & 59 deletions pulp_container/app/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def get_tag(self, request):
"Docker-Content-Digest": digest,
"Docker-Distribution-API-Version": "registry/2.0",
}
return web.Response(text=raw_manifest, headers=headers)
return web.Response(text=raw_manifest.decode("utf-8"), headers=headers)
else:
raise PathNotResolved(tag_name)

Expand Down Expand Up @@ -176,7 +176,7 @@ async def get_tag(self, request):
"Docker-Content-Digest": digest,
"Docker-Distribution-API-Version": "registry/2.0",
}
return web.Response(text=raw_manifest, headers=headers)
return web.Response(text=raw_manifest.decode("utf-8"), headers=headers)

accepted_media_types = get_accepted_media_types(request.headers)

Expand All @@ -200,7 +200,7 @@ async def get_tag(self, request):
"Content-Type": return_media_type,
"Docker-Content-Digest": tag.tagged_manifest.digest,
}
return await self.dispatch_tag(request, tag, response_headers)
return web.Response(text=tag.tagged_manifest.data, headers=response_headers)

# return what was found in case media_type is accepted header (docker, oci)
if tag.tagged_manifest.media_type in accepted_media_types:
Expand All @@ -209,35 +209,11 @@ async def get_tag(self, request):
"Content-Type": return_media_type,
"Docker-Content-Digest": tag.tagged_manifest.digest,
}
return await self.dispatch_tag(request, tag, response_headers)
return web.Response(text=tag.tagged_manifest.data, headers=response_headers)

# return 404 in case the client is requesting docker manifest v2 schema 1
raise PathNotResolved(tag_name)

async def dispatch_tag(self, request, tag, response_headers):
"""
Finds an artifact associated with a Tag and sends it to the client, otherwise tries
to stream it.
Args:
request(:class:`~aiohttp.web.Request`): The request to prepare a response for.
tag: Tag
response_headers (dict): dictionary that contains the 'Content-Type' header to send
with the response
Returns:
:class:`aiohttp.web.StreamResponse` or :class:`aiohttp.web.FileResponse`: The response
streamed back to the client.
"""
try:
artifact = await tag.tagged_manifest._artifacts.aget()
except ObjectDoesNotExist:
ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all())
return await self._stream_content_artifact(request, web.StreamResponse(), ca)
else:
return await Registry._dispatch(artifact, response_headers)

@RegistryContentCache(
base_key=lambda req, cac: Registry.find_base_path_cached(req, cac),
auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk),
Expand All @@ -261,37 +237,45 @@ async def get_by_digest(self, request):
pending_manifests = repository.pending_manifests.values_list("pk")
pending_content = pending_blobs.union(pending_manifests)
content = repository_version.content | Content.objects.filter(pk__in=pending_content)
# "/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}"
content_type = request.match_info["content"]

try:
ca = await ContentArtifact.objects.select_related("artifact", "content").aget(
content__in=content, relative_path=digest
)
ca_content = await sync_to_async(ca.content.cast)()
if isinstance(ca_content, Blob):
media_type = BLOB_CONTENT_TYPE
else:
media_type = ca_content.media_type
headers = {
"Content-Type": media_type,
"Docker-Content-Digest": ca_content.digest,
}
if content_type == "manifests":
manifest = await Manifest.objects.aget(digest=digest)
headers = {
"Content-Type": manifest.media_type,
"Docker-Content-Digest": manifest.digest,
}
return web.Response(text=manifest.data, headers=headers)
elif content_type == "blobs":
ca = await ContentArtifact.objects.select_related("artifact", "content").aget(
content__in=content, relative_path=digest
)
ca_content = await sync_to_async(ca.content.cast)()
if isinstance(ca_content, Blob):
media_type = BLOB_CONTENT_TYPE
else:
media_type = ca_content.media_type
headers = {
"Content-Type": media_type,
"Docker-Content-Digest": ca_content.digest,
}
except ObjectDoesNotExist:
distribution = await distribution.acast()
if distribution.remote_id and distribution.pull_through_distribution_id:
pull_downloader = await PullThroughDownloader.create(
distribution, repository_version, path, digest
)

# "/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}"
content_type = request.match_info["content"]
if content_type == "manifests":
raw_manifest, digest, media_type = await pull_downloader.download_manifest()
headers = {
"Content-Type": media_type,
"Docker-Content-Digest": digest,
"Docker-Distribution-API-Version": "registry/2.0",
}
return web.Response(text=raw_manifest, headers=headers)
return web.Response(text=raw_manifest.decode("utf-8"), headers=headers)
elif content_type == "blobs":
# there might be a case where the client has all the manifest data in place
# and tries to download only missing blobs; because of that, only the reference
Expand All @@ -304,6 +288,7 @@ async def get_by_digest(self, request):
else:
raise PathNotResolved(path)
else:
# else branch can be reached only for blob
artifact = ca.artifact
if artifact:
return await Registry._dispatch(artifact, headers)
Expand Down Expand Up @@ -349,14 +334,14 @@ async def init_remote_blob(self):
async def download_manifest(self, run_pipeline=False):
response = await self.run_manifest_downloader()

with open(response.path) as f:
with open(response.path, mode="rb") as f:
raw_data = f.read()

response.artifact_attributes["file"] = response.path
saved_artifact = await save_artifact(response.artifact_attributes)
# response.artifact_attributes["file"] = response.path
# saved_artifact = await save_artifact(response.artifact_attributes)

if run_pipeline:
await self.run_pipeline(saved_artifact)
await self.run_pipeline(raw_data)

try:
manifest_data = json.loads(raw_data)
Expand All @@ -371,7 +356,7 @@ async def download_manifest(self, run_pipeline=False):
if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI):
# add the manifest and blobs to the repository to be able to stream it
# in the next round when a client approaches the registry
await self.init_pending_content(digest, manifest_data, media_type, saved_artifact)
await self.init_pending_content(digest, manifest_data, raw_data, media_type)

return raw_data, digest, media_type

Expand All @@ -396,20 +381,20 @@ async def run_manifest_downloader(self):

return response

async def run_pipeline(self, saved_artifact):
async def run_pipeline(self, raw_manifest_data):
set_guid(generate_guid())
await sync_to_async(dispatch)(
download_image_data,
exclusive_resources=[self.repository_version.repository],
kwargs={
"repository_pk": self.repository_version.repository.pk,
"remote_pk": self.remote.pk,
"manifest_artifact_pk": saved_artifact.pk,
"text_manifest_data": raw_manifest_data.decode("utf-8"),
"tag_name": self.identifier,
},
)

async def init_pending_content(self, digest, manifest_data, media_type, artifact):
async def init_pending_content(self, digest, manifest_data, raw_data, media_type):
if config := manifest_data.get("config", None):
config_digest = config["digest"]
config_blob = await self.save_config_blob(config_digest)
Expand All @@ -419,11 +404,14 @@ async def init_pending_content(self, digest, manifest_data, media_type, artifact

manifest = Manifest(
digest=digest,
schema_version=2
if manifest_data["mediaType"] in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI)
else 1,
schema_version=(
2
if manifest_data["mediaType"] in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI)
else 1
),
media_type=media_type,
config_blob=config_blob,
data=raw_data.decode("utf-8"),
)

# skip if media_type of schema1
Expand All @@ -441,11 +429,11 @@ async def init_pending_content(self, digest, manifest_data, media_type, artifact
blob = await self.save_blob(layer["digest"], manifest)
await sync_to_async(self.repository.pending_blobs.add)(blob)

content_artifact = ContentArtifact(
artifact=artifact, content=manifest, relative_path=manifest.digest
)
with suppress(IntegrityError):
await content_artifact.asave()
# content_artifact = ContentArtifact(
# artifact=artifact, content=manifest, relative_path=manifest.digest
# )
# with suppress(IntegrityError):
# await content_artifact.asave()

async def save_blob(self, digest, manifest):
blob = Blob(digest=digest)
Expand Down
Loading

0 comments on commit f3e4279

Please sign in to comment.