Skip to content

Commit 88a62e8

Browse files
committed
Replicas should atomically update their distributions
It's not ideal for distributions to pick up replica changes at random time intervals as various tasks complete, ideally the entire replica is presented as updated at once (or with the smallest possible window). closes #7333 Assisted-By: claude-opus-4.6
1 parent 3cd99e7 commit 88a62e8

5 files changed

Lines changed: 67 additions & 16 deletions

File tree

CHANGES/7333.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed.

pulp_file/app/replica.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ def url(self, upstream_distribution):
3333
manifest = self.publication_ctx_cls(
3434
self.pulp_ctx, upstream_distribution["publication"]
3535
).entity["manifest"]
36+
elif upstream_distribution.get("repository_version"):
37+
# Extract repository href from repository_version href
38+
repo_href = upstream_distribution["repository_version"].rsplit("versions/", 1)[0]
39+
manifest = self.repository_ctx_cls(
40+
self.pulp_ctx, repo_href
41+
).entity["manifest"]
3642
else:
3743
# This distribution doesn't serve any content
3844
return None

pulpcore/app/replica.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ def remote_extra_fields(self, upstream_distribution):
106106
return {}
107107

108108
def create_or_update_remote(self, upstream_distribution):
109-
if not upstream_distribution.get("repository") and not upstream_distribution.get(
110-
"publication"
109+
if (
110+
not upstream_distribution.get("repository")
111+
and not upstream_distribution.get("repository_version")
112+
and not upstream_distribution.get("publication")
111113
):
112114
return None
113115
url = self.url(upstream_distribution)
@@ -171,9 +173,18 @@ def create_or_update_repository(self, remote):
171173
def distribution_extra_fields(self, repository, upstream_distribution):
172174
"""
173175
Return the fields that need to be updated/cleared on distributions for idempotence.
176+
177+
Note: repository_version is computed here but filtered out for updates/creates.
178+
It will be set atomically in finalize_replication after sync completes.
174179
"""
180+
latest = repository.latest_version()
181+
if latest:
182+
repo_version_href = get_url(repository) + "versions/{}/".format(latest.number)
183+
else:
184+
repo_version_href = None
175185
return {
176-
"repository": get_url(repository),
186+
"repository": None,
187+
"repository_version": repo_version_href,
177188
"publication": None,
178189
"base_path": upstream_distribution["base_path"],
179190
}
@@ -187,7 +198,11 @@ def create_or_update_distribution(self, repository, upstream_distribution):
187198
)
188199
if not self._is_managed(distro):
189200
return None
190-
needs_update = self.needs_update(distribution_data, distro)
201+
# Don't update repository_version here — that happens atomically in
202+
# finalize_replication after all syncs complete.
203+
# Do clear repository and publication so they don't conflict.
204+
update_data = {k: v for k, v in distribution_data.items() if k != "repository_version"}
205+
needs_update = self.needs_update(update_data, distro)
191206
if needs_update:
192207
# Update the distribution
193208
dispatch(
@@ -197,20 +212,28 @@ def create_or_update_distribution(self, repository, upstream_distribution):
197212
exclusive_resources=self.distros_uris,
198213
args=(distro.pk, self.app_label, self.distribution_serializer_name),
199214
kwargs={
200-
"data": distribution_data,
215+
"data": update_data,
201216
"partial": True,
202217
},
203218
)
204219
except self.distribution_model_cls.DoesNotExist:
205220
# Dispatch a task to create the distribution
206-
distribution_data["name"] = upstream_distribution["name"]
221+
# Don't set repository_version for new distributions - it will be set in
222+
# finalize_replication after sync completes.
223+
create_data = {
224+
k: v
225+
for k, v in distribution_data.items()
226+
if k not in ("repository_version", "repository", "publication")
227+
}
228+
create_data["name"] = upstream_distribution["name"]
229+
create_data["pulp_labels"] = distribution_data["pulp_labels"]
207230
dispatch(
208231
general_create,
209232
task_group=self.task_group,
210233
shared_resources=[repository, self.server],
211234
exclusive_resources=self.distros_uris,
212235
args=(self.app_label, self.distribution_serializer_name),
213-
kwargs={"data": distribution_data},
236+
kwargs={"data": create_data},
214237
)
215238

216239
def sync_params(self, repository, remote):

pulpcore/app/tasks/replica.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
import sys
44
from tempfile import NamedTemporaryFile
55

6+
from django.db import transaction
67
from django.db.models import Min
78

89
from pulpcore.constants import TASK_STATES
910
from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig
10-
from pulpcore.app.models import UpstreamPulp, Task, TaskGroup
11+
from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup
1112
from pulpcore.app.replica import ReplicaContext
1213
from pulpcore.tasking.tasks import dispatch
1314

@@ -77,6 +78,7 @@ def replicate_distributions(server_pk):
7778
replicator = replicator_class(ctx, task_group, tls_settings, server)
7879
supported_replicators.append(replicator)
7980

81+
distro_repo_pairs = []
8082
for replicator in supported_replicators:
8183
distros = replicator.upstream_distributions(q=server.q_select)
8284
distro_names = []
@@ -90,7 +92,7 @@ def replicate_distributions(server_pk):
9092
# Check if there is already a repository
9193
repository = replicator.create_or_update_repository(remote=remote)
9294
if not repository:
93-
# No update occured because server.policy==LABELED and there was
95+
# No update occurred because server.policy==LABELED and there was
9496
# an already existing local repository with the same name
9597
continue
9698

@@ -103,24 +105,36 @@ def replicate_distributions(server_pk):
103105

104106
# Add name to the list of known distribution names
105107
distro_names.append(distro["name"])
108+
distro_repo_pairs.append((distro["name"], str(repository.pk)))
106109

107110
replicator.remove_missing(distro_names)
108111

109112
dispatch(
110113
finalize_replication,
111114
task_group=task_group,
112115
exclusive_resources=[server],
113-
args=[server.pk],
116+
args=[server.pk, distro_repo_pairs],
114117
)
115118

116119

117-
def finalize_replication(server_pk):
120+
def finalize_replication(server_pk, distro_repo_pairs):
118121
task = Task.current()
119122
task_group = TaskGroup.current()
120123
server = UpstreamPulp.objects.get(pk=server_pk)
121124
if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists():
122125
raise Exception("Replication failed.")
123126

127+
# Atomically update all managed distributions to point to their repo's latest version.
128+
with transaction.atomic():
129+
for distro_name, repo_pk in distro_repo_pairs:
130+
distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain)
131+
repo = Repository.objects.get(pk=repo_pk)
132+
latest_version = repo.latest_version()
133+
if latest_version:
134+
if distro.repository_version != latest_version:
135+
distro.repository_version = latest_version
136+
distro.save(update_fields=["repository_version"])
137+
124138
# Record timestamp of last successful replication.
125139
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]
126140
server.set_last_replication_timestamp(started_at)

pulpcore/tests/functional/api/test_replication.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ def test_replication_idempotence(
117117
assert "UpstreamPulp" in obj.pulp_labels
118118
assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"]
119119

120+
# Verify the replica distribution uses repository_version (not repository)
121+
replica_distro = file_bindings.DistributionsFileApi.list(
122+
pulp_domain=replica_domain.name
123+
).results[0]
124+
assert replica_distro.repository is None
125+
assert replica_distro.repository_version is not None
126+
120127
# Now replicate backwards
121128

122129
upstream_pulp_body = {
@@ -147,7 +154,9 @@ def test_replication_idempotence(
147154
assert result.count == 1
148155
new_distribution = result.results[0]
149156
assert new_distribution.pulp_href == distro.pulp_href
150-
assert new_distribution.repository == new_repository.pulp_href
157+
assert new_distribution.repository is None
158+
assert new_distribution.repository_version is not None
159+
assert new_distribution.repository_version.startswith(new_repository.pulp_href)
151160
assert new_distribution.publication is None
152161
assert "UpstreamPulp" in new_distribution.pulp_labels
153162
assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"]
@@ -373,11 +382,9 @@ def _check_replication(
373382
assert upstream_pulp.last_replication > old_replication
374383

375384
# check if the content was correctly replicated
376-
local_version = file_bindings.RepositoriesFileApi.read(
377-
distribution.repository
378-
).latest_version_href
385+
assert distribution.repository_version is not None
379386
local_present = file_bindings.RepositoriesFileVersionsApi.read(
380-
local_version
387+
distribution.repository_version
381388
).content_summary.present
382389
upstream_version = file_bindings.PublicationsFileApi.read(
383390
upstream_distribution.publication

0 commit comments

Comments
 (0)