Skip to content

Commit 8434df5

Browse files
committed
Add additional tests for replication
Generated-By: claude-opus-4.6
1 parent a92f9e2 commit 8434df5

3 files changed

Lines changed: 203 additions & 36 deletions

File tree

pulpcore/app/replica.py

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,10 @@ def distribution_extra_fields(self, repository, upstream_distribution):
174174
"""
175175
Return the fields that need to be updated/cleared on distributions for idempotence.
176176
177-
Note: repository_version is computed here but filtered out for updates/creates.
178-
It will be set atomically in finalize_replication after sync completes.
177+
Note: repository, publication, and repository_version are NOT included here.
178+
They are all updated atomically in finalize_replication after all syncs complete.
179179
"""
180-
latest = repository.latest_version()
181-
if latest:
182-
repo_version_href = get_url(repository) + "versions/{}/".format(latest.number)
183-
else:
184-
repo_version_href = None
185180
return {
186-
"repository": None,
187-
"repository_version": repo_version_href,
188-
"publication": None,
189181
"base_path": upstream_distribution["base_path"],
190182
}
191183

@@ -198,35 +190,22 @@ def create_or_update_distribution(self, repository, upstream_distribution):
198190
)
199191
if not self._is_managed(distro):
200192
return None
201-
# Don't update repository_version here — that happens atomically in
202-
# finalize_replication after all syncs complete.
203-
# Do clear repository and publication so they don't conflict.
204-
update_data = {k: v for k, v in distribution_data.items() if k != "repository_version"}
205-
needs_update = self.needs_update(update_data, distro)
193+
needs_update = self.needs_update(distribution_data, distro)
206194
if needs_update:
207-
# Update the distribution
208195
dispatch(
209196
ageneral_update,
210197
task_group=self.task_group,
211198
shared_resources=[repository, self.server],
212199
exclusive_resources=self.distros_uris,
213200
args=(distro.pk, self.app_label, self.distribution_serializer_name),
214201
kwargs={
215-
"data": update_data,
202+
"data": distribution_data,
216203
"partial": True,
217204
},
218205
)
219206
except self.distribution_model_cls.DoesNotExist:
220-
# Dispatch a task to create the distribution
221-
# Don't set repository_version for new distributions - it will be set in
222-
# finalize_replication after sync completes.
223-
create_data = {
224-
k: v
225-
for k, v in distribution_data.items()
226-
if k not in ("repository_version", "repository", "publication")
227-
}
207+
create_data = dict(distribution_data)
228208
create_data["name"] = upstream_distribution["name"]
229-
create_data["pulp_labels"] = distribution_data["pulp_labels"]
230209
dispatch(
231210
general_create,
232211
task_group=self.task_group,

pulpcore/app/tasks/replica.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,16 +124,24 @@ def finalize_replication(server_pk, distro_repo_pairs):
124124
if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists():
125125
raise Exception("Replication failed.")
126126

127-
# Atomically update all managed distributions to point to their repo's latest version.
127+
# Atomically update all managed distributions to point to their repo's latest version,
128+
# clearing any previous repository or publication references.
128129
with transaction.atomic():
129130
for distro_name, repo_pk in distro_repo_pairs:
130131
distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain)
131132
repo = Repository.objects.get(pk=repo_pk)
132133
latest_version = repo.latest_version()
133134
if latest_version:
134-
if distro.repository_version != latest_version:
135+
needs_update = (
136+
distro.repository_version != latest_version
137+
or distro.repository is not None
138+
or distro.publication is not None
139+
)
140+
if needs_update:
141+
distro.repository = None
142+
distro.publication = None
135143
distro.repository_version = latest_version
136-
distro.save(update_fields=["repository_version"])
144+
distro.save(update_fields=["repository", "publication", "repository_version"])
137145

138146
# Record timestamp of last successful replication.
139147
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]

pulpcore/tests/functional/api/test_replication.py

Lines changed: 187 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ def test_replication(
1313
pulp_settings,
1414
gen_object_with_cleanup,
1515
):
16-
# This test assures that an Upstream Pulp can be created in a non-default domain and that this
17-
# Upstream Pulp configuration can be used to execute the replicate task.
16+
"""This test assures that an Upstream Pulp can be created in a non-default domain and that this
17+
Upstream Pulp configuration can be used to execute the replicate task."""
1818

1919
# Create a non-default domain
2020
non_default_domain = domain_factory()
@@ -57,8 +57,8 @@ def test_replication_idempotence(
5757
tmp_path,
5858
add_domain_objects_to_cleanup,
5959
):
60-
# This test assures that an Upstream Pulp can be created in a non-default domain and that this
61-
# Upstream Pulp configuration can be used to execute the replicate task.
60+
"""This test assures that an Upstream Pulp can be created in a non-default domain and that this
61+
Upstream Pulp configuration can be used to execute the replicate task."""
6262

6363
# Create a domain to replicate from
6464
source_domain = domain_factory()
@@ -168,6 +168,186 @@ def test_replication_idempotence(
168168
assert upstream_pulp2.prn.split(":")[-1] == new_remote.pulp_labels["UpstreamPulp"]
169169

170170

171+
@pytest.mark.parallel
172+
def test_replication_with_repo_based_distribution(
173+
domain_factory,
174+
bindings_cfg,
175+
pulpcore_bindings,
176+
file_bindings,
177+
monitor_task,
178+
monitor_task_group,
179+
pulp_settings,
180+
gen_object_with_cleanup,
181+
file_distribution_factory,
182+
file_repository_factory,
183+
file_remote_factory,
184+
basic_manifest_path,
185+
add_domain_objects_to_cleanup,
186+
):
187+
"""Test replication when upstream distribution uses repository (not publication)."""
188+
source_domain = domain_factory()
189+
add_domain_objects_to_cleanup(source_domain)
190+
191+
# Create a repo with autopublish, sync it, and distribute via repository (not publication)
192+
remote = file_remote_factory(
193+
pulp_domain=source_domain.name, manifest_path=basic_manifest_path, policy="immediate"
194+
)
195+
repo = file_repository_factory(pulp_domain=source_domain.name, autopublish=True)
196+
sync_data = file_bindings.module.RepositorySyncURL(remote=remote.pulp_href, mirror=True)
197+
monitor_task(file_bindings.RepositoriesFileApi.sync(repo.pulp_href, sync_data).task)
198+
_ = file_distribution_factory(pulp_domain=source_domain.name, repository=repo.pulp_href)
199+
200+
# Replicate
201+
replica_domain = domain_factory()
202+
add_domain_objects_to_cleanup(replica_domain)
203+
upstream_pulp = gen_object_with_cleanup(
204+
pulpcore_bindings.UpstreamPulpsApi,
205+
{
206+
"name": str(uuid.uuid4()),
207+
"base_url": bindings_cfg.host,
208+
"api_root": pulp_settings.API_ROOT,
209+
"domain": source_domain.name,
210+
"username": bindings_cfg.username,
211+
"password": bindings_cfg.password,
212+
},
213+
pulp_domain=replica_domain.name,
214+
)
215+
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href)
216+
monitor_task_group(response.task_group)
217+
218+
# Verify replica distribution uses repository_version, not repository or publication
219+
replica_distro = file_bindings.DistributionsFileApi.list(
220+
pulp_domain=replica_domain.name
221+
).results[0]
222+
assert replica_distro.repository is None
223+
assert replica_distro.repository_version is not None
224+
assert replica_distro.publication is None
225+
226+
# Verify content was replicated
227+
source_repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href)
228+
source_present = file_bindings.RepositoriesFileVersionsApi.read(
229+
source_repo.latest_version_href
230+
).content_summary.present
231+
replica_present = file_bindings.RepositoriesFileVersionsApi.read(
232+
replica_distro.repository_version
233+
).content_summary.present
234+
assert source_present["file.file"]["count"] == replica_present["file.file"]["count"]
235+
236+
237+
@pytest.mark.parallel
238+
def test_replication_multi_distribution_content_update(
239+
domain_factory,
240+
bindings_cfg,
241+
pulpcore_bindings,
242+
file_bindings,
243+
monitor_task,
244+
monitor_task_group,
245+
pulp_settings,
246+
gen_object_with_cleanup,
247+
file_distribution_factory,
248+
file_repository_factory,
249+
file_publication_factory,
250+
add_domain_objects_to_cleanup,
251+
tmp_path,
252+
):
253+
"""Test that all distributions are updated after re-replication with new content."""
254+
source_domain = domain_factory()
255+
add_domain_objects_to_cleanup(source_domain)
256+
257+
# Create 3 repos with content and publication-based distributions
258+
distros = []
259+
repos = []
260+
for i in range(3):
261+
repo = file_repository_factory(pulp_domain=source_domain.name)
262+
repos.append(repo)
263+
file_path = tmp_path / f"file_{i}.txt"
264+
file_path.write_text(f"content_{i}")
265+
monitor_task(
266+
file_bindings.ContentFilesApi.create(
267+
file=str(file_path),
268+
relative_path=f"file_{i}.txt",
269+
repository=repo.pulp_href,
270+
pulp_domain=source_domain.name,
271+
).task
272+
)
273+
pub = file_publication_factory(pulp_domain=source_domain.name, repository=repo.pulp_href)
274+
distros.append(
275+
file_distribution_factory(pulp_domain=source_domain.name, publication=pub.pulp_href)
276+
)
277+
278+
# Initial replication
279+
replica_domain = domain_factory()
280+
add_domain_objects_to_cleanup(replica_domain)
281+
upstream_pulp = gen_object_with_cleanup(
282+
pulpcore_bindings.UpstreamPulpsApi,
283+
{
284+
"name": str(uuid.uuid4()),
285+
"base_url": bindings_cfg.host,
286+
"api_root": pulp_settings.API_ROOT,
287+
"domain": source_domain.name,
288+
"username": bindings_cfg.username,
289+
"password": bindings_cfg.password,
290+
},
291+
pulp_domain=replica_domain.name,
292+
)
293+
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href)
294+
monitor_task_group(response.task_group)
295+
296+
# Record initial versions
297+
replica_distros = file_bindings.DistributionsFileApi.list(
298+
pulp_domain=replica_domain.name
299+
).results
300+
assert len(replica_distros) == 3
301+
initial_versions = {}
302+
for rd in replica_distros:
303+
assert rd.repository is None
304+
assert rd.repository_version is not None
305+
assert rd.publication is None
306+
initial_versions[rd.name] = rd.repository_version
307+
308+
# Add new content to all source repos and update publications
309+
for i, repo in enumerate(repos):
310+
file_path = tmp_path / f"file_{i}_v2.txt"
311+
file_path.write_text(f"new_content_{i}")
312+
monitor_task(
313+
file_bindings.ContentFilesApi.create(
314+
file=str(file_path),
315+
relative_path=f"file_{i}_v2.txt",
316+
repository=repo.pulp_href,
317+
pulp_domain=source_domain.name,
318+
).task
319+
)
320+
repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href)
321+
pub = file_publication_factory(
322+
pulp_domain=source_domain.name,
323+
repository_version=repo.latest_version_href,
324+
)
325+
monitor_task(
326+
file_bindings.DistributionsFileApi.partial_update(
327+
distros[i].pulp_href, {"publication": pub.pulp_href}
328+
).task
329+
)
330+
331+
# Re-replicate
332+
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href)
333+
monitor_task_group(response.task_group)
334+
335+
# Verify all distributions were updated to new versions with new content
336+
replica_distros = file_bindings.DistributionsFileApi.list(
337+
pulp_domain=replica_domain.name
338+
).results
339+
assert len(replica_distros) == 3
340+
for rd in replica_distros:
341+
assert rd.repository is None
342+
assert rd.repository_version is not None
343+
assert rd.publication is None
344+
# Version should have changed
345+
assert rd.repository_version != initial_versions[rd.name]
346+
# Verify new content is present (original file + new file = 2)
347+
version = file_bindings.RepositoriesFileVersionsApi.read(rd.repository_version)
348+
assert version.content_summary.present["file.file"]["count"] == 2
349+
350+
171351
@pytest.mark.parallel
172352
def test_replication_with_wrong_ca_cert(
173353
domain_factory,
@@ -177,9 +357,9 @@ def test_replication_with_wrong_ca_cert(
177357
pulp_settings,
178358
gen_object_with_cleanup,
179359
):
180-
# This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used
181-
# to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks
182-
# are expected to fail.
360+
"""This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used
361+
to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks
362+
are expected to fail."""
183363

184364
if not bindings_cfg.host.startswith("https"):
185365
pytest.skip("HTTPS is not enabled for Pulp's API.")

0 commit comments

Comments
 (0)