Skip to content

Commit 95d534b

Browse files
committed
Replicas should atomically update their distributions
It's not ideal for distributions to pick up replica changes at random time intervals as various tasks complete, ideally the entire replica is presented as updated at once (or with the smallest possible window). closes #7333 Assisted-By: claude-opus-4.6
1 parent cecbf50 commit 95d534b

4 files changed

Lines changed: 44 additions & 12 deletions

File tree

CHANGES/7333.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed.

pulpcore/app/replica.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,11 @@ def distribution_extra_fields(self, repository, upstream_distribution):
172172
"""
173173
Return the fields that need to be updated/cleared on distributions for idempotence.
174174
"""
175+
latest = repository.latest_version()
176+
repo_version_href = get_url(repository) + "versions/{}/".format(latest.number)
175177
return {
176-
"repository": get_url(repository),
178+
"repository": None,
179+
"repository_version": repo_version_href,
177180
"publication": None,
178181
"base_path": upstream_distribution["base_path"],
179182
}
@@ -187,7 +190,15 @@ def create_or_update_distribution(self, repository, upstream_distribution):
187190
)
188191
if not self._is_managed(distro):
189192
return None
190-
needs_update = self.needs_update(distribution_data, distro)
193+
# Don't update repository_version here — that happens atomically in
194+
# finalize_replication after all syncs complete.
195+
update_data = {
196+
k: v
197+
for k, v in distribution_data.items()
198+
if k not in ("repository_version", "repository", "publication")
199+
}
200+
update_data["pulp_labels"] = distribution_data["pulp_labels"]
201+
needs_update = self.needs_update(update_data, distro)
191202
if needs_update:
192203
# Update the distribution
193204
dispatch(
@@ -197,7 +208,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
197208
exclusive_resources=self.distros_uris,
198209
args=(distro.pk, self.app_label, self.distribution_serializer_name),
199210
kwargs={
200-
"data": distribution_data,
211+
"data": update_data,
201212
"partial": True,
202213
},
203214
)

pulpcore/app/tasks/replica.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
import sys
44
from tempfile import NamedTemporaryFile
55

6+
from django.db import transaction
67
from django.db.models import Min
78

89
from pulpcore.constants import TASK_STATES
910
from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig
10-
from pulpcore.app.models import UpstreamPulp, Task, TaskGroup
11+
from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup
1112
from pulpcore.app.replica import ReplicaContext
1213
from pulpcore.tasking.tasks import dispatch
1314

@@ -77,6 +78,7 @@ def replicate_distributions(server_pk):
7778
replicator = replicator_class(ctx, task_group, tls_settings, server)
7879
supported_replicators.append(replicator)
7980

81+
distro_repo_pairs = []
8082
for replicator in supported_replicators:
8183
distros = replicator.upstream_distributions(q=server.q_select)
8284
distro_names = []
@@ -90,7 +92,7 @@ def replicate_distributions(server_pk):
9092
# Check if there is already a repository
9193
repository = replicator.create_or_update_repository(remote=remote)
9294
if not repository:
93-
# No update occured because server.policy==LABELED and there was
95+
# No update occurred because server.policy==LABELED and there was
9496
# an already existing local repository with the same name
9597
continue
9698

@@ -103,24 +105,35 @@ def replicate_distributions(server_pk):
103105

104106
# Add name to the list of known distribution names
105107
distro_names.append(distro["name"])
108+
distro_repo_pairs.append((distro["name"], str(repository.pk)))
106109

107110
replicator.remove_missing(distro_names)
108111

109112
dispatch(
110113
finalize_replication,
111114
task_group=task_group,
112115
exclusive_resources=[server],
113-
args=[server.pk],
116+
args=[server.pk, distro_repo_pairs],
114117
)
115118

116119

117-
def finalize_replication(server_pk):
120+
def finalize_replication(server_pk, distro_repo_pairs):
118121
task = Task.current()
119122
task_group = TaskGroup.current()
120123
server = UpstreamPulp.objects.get(pk=server_pk)
121124
if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists():
122125
raise Exception("Replication failed.")
123126

127+
# Atomically update all managed distributions to point to their repo's latest version.
128+
with transaction.atomic():
129+
for distro_name, repo_pk in distro_repo_pairs:
130+
distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain)
131+
repo = Repository.objects.get(pk=repo_pk)
132+
latest_version = repo.latest_version()
133+
if latest_version and distro.repository_version != latest_version:
134+
distro.repository_version = latest_version
135+
distro.save(update_fields=["repository_version"])
136+
124137
# Record timestamp of last successful replication.
125138
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]
126139
server.set_last_replication_timestamp(started_at)

pulpcore/tests/functional/api/test_replication.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ def test_replication_idempotence(
117117
assert "UpstreamPulp" in obj.pulp_labels
118118
assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"]
119119

120+
# Verify the replica distribution uses repository_version (not repository)
121+
replica_distro = file_bindings.DistributionsFileApi.list(
122+
pulp_domain=replica_domain.name
123+
).results[0]
124+
assert replica_distro.repository is None
125+
assert replica_distro.repository_version is not None
126+
120127
# Now replicate backwards
121128

122129
upstream_pulp_body = {
@@ -147,7 +154,9 @@ def test_replication_idempotence(
147154
assert result.count == 1
148155
new_distribution = result.results[0]
149156
assert new_distribution.pulp_href == distro.pulp_href
150-
assert new_distribution.repository == new_repository.pulp_href
157+
assert new_distribution.repository is None
158+
assert new_distribution.repository_version is not None
159+
assert new_distribution.repository_version.startswith(new_repository.pulp_href)
151160
assert new_distribution.publication is None
152161
assert "UpstreamPulp" in new_distribution.pulp_labels
153162
assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"]
@@ -373,11 +382,9 @@ def _check_replication(
373382
assert upstream_pulp.last_replication > old_replication
374383

375384
# check if the content was correctly replicated
376-
local_version = file_bindings.RepositoriesFileApi.read(
377-
distribution.repository
378-
).latest_version_href
385+
assert distribution.repository_version is not None
379386
local_present = file_bindings.RepositoriesFileVersionsApi.read(
380-
local_version
387+
distribution.repository_version
381388
).content_summary.present
382389
upstream_version = file_bindings.PublicationsFileApi.read(
383390
upstream_distribution.publication

0 commit comments

Comments
 (0)