Skip to content

Commit 8f33103

Browse files
jdrew82claude
andcommitted
feat: Add sync_stages for ordered concurrent sync execution
Add sync_stages ClassVar on Adapter to control the order of model type processing during concurrent sync. Stages execute sequentially while elements within each stage run in parallel via ThreadPoolExecutor. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d3ad91b commit 8f33103

File tree

3 files changed

+244
-5
lines changed

3 files changed

+244
-5
lines changed

diffsync/__init__.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,22 @@ class Adapter: # pylint: disable=too-many-public-methods
487487
top_level: ClassVar[List[str]] = []
488488
"""List of top-level modelnames to begin from when diffing or synchronizing."""
489489

490+
sync_stages: ClassVar[Optional[List[List[str]]]] = None
491+
"""Optional ordered groups of model types for staged concurrent sync.
492+
493+
Each inner list is a "stage" of model types that can safely execute in parallel.
494+
Stages are processed sequentially — all elements in stage N complete before stage N+1 begins.
495+
Only used when ``concurrent=True``; ignored for serial sync.
496+
497+
Example::
498+
499+
sync_stages = [
500+
["site", "vlan"], # stage 1: independent types, run in parallel
501+
["device"], # stage 2: depends on sites
502+
["interface"], # stage 3: depends on devices
503+
]
504+
"""
505+
490506
def __init__(
491507
self,
492508
name: Optional[str] = None,
@@ -528,6 +544,21 @@ def __init_subclass__(cls) -> None:
528544
if not isclass(value) or not issubclass(value, DiffSyncModel):
529545
raise AttributeError(f'top_level references attribute "{name}" but it is not a DiffSyncModel subclass!')
530546

547+
if cls.sync_stages is not None:
548+
top_level_set = set(cls.top_level)
549+
seen: set = set()
550+
for stage in cls.sync_stages:
551+
for model_type in stage:
552+
if model_type not in top_level_set:
553+
raise AttributeError(
554+
f'sync_stages references "{model_type}" but it is not in top_level!'
555+
)
556+
if model_type in seen:
557+
raise AttributeError(
558+
f'sync_stages contains duplicate entry "{model_type}"!'
559+
)
560+
seen.add(model_type)
561+
531562
def __new__(cls, **kwargs): # type: ignore[no-untyped-def]
532563
"""Document keyword arguments that were used to initialize Adapter."""
533564
meta_kwargs = {}
@@ -687,6 +718,7 @@ def sync_from( # pylint: disable=too-many-arguments,R0917,too-many-locals
687718
batch_size=batch_size,
688719
concurrent=concurrent,
689720
max_workers=max_workers,
721+
sync_stages=self.sync_stages,
690722
)
691723
result = syncer.perform_sync()
692724
if result:

diffsync/helpers.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ def __init__( # pylint: disable=too-many-arguments,R0917
346346
batch_size: Optional[int] = None,
347347
concurrent: bool = False,
348348
max_workers: Optional[int] = None,
349+
sync_stages: Optional[List[List[str]]] = None,
349350
):
350351
"""Create a DiffSyncSyncer instance, ready to call `perform_sync()` against."""
351352
self.diff = diff
@@ -363,6 +364,7 @@ def __init__( # pylint: disable=too-many-arguments,R0917
363364
# Feature 3: Parallel sync of independent subtrees
364365
self.concurrent = concurrent
365366
self.max_workers = max_workers
367+
self.sync_stages = sync_stages
366368

367369
# Feature 4: Structured operations summary
368370
self.operations: Dict[str, Dict[str, List[Dict]]] = {}
@@ -397,11 +399,30 @@ def perform_sync(self) -> bool:
397399

398400
# Feature 3: Parallel sync of independent subtrees
399401
if self.concurrent:
400-
elements = list(self.diff.get_children())
401-
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
402-
futures = {executor.submit(self.sync_diff_element, element): element for element in elements}
403-
for future in as_completed(futures):
404-
changed |= future.result()
402+
if self.sync_stages:
403+
# Staged concurrent execution: process each stage sequentially,
404+
# parallelizing elements within each stage.
405+
for stage in self.sync_stages:
406+
stage_set = set(stage)
407+
stage_elements = [el for el in self.diff.get_children() if el.type in stage_set]
408+
if stage_elements:
409+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
410+
futures = {executor.submit(self.sync_diff_element, el): el for el in stage_elements}
411+
for future in as_completed(futures):
412+
changed |= future.result()
413+
414+
# Handle any elements whose type is not covered by sync_stages (serial fallback)
415+
staged_types = {t for stage in self.sync_stages for t in stage}
416+
for element in self.diff.get_children():
417+
if element.type not in staged_types:
418+
changed |= self.sync_diff_element(element)
419+
else:
420+
# No stages defined — all elements in one pool (original behavior)
421+
elements = list(self.diff.get_children())
422+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
423+
futures = {executor.submit(self.sync_diff_element, element): element for element in elements}
424+
for future in as_completed(futures):
425+
changed |= future.result()
405426
else:
406427
for element in self.diff.get_children():
407428
changed |= self.sync_diff_element(element)

tests/unit/test_diffsync_diff_and_sync_parameters.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,3 +442,189 @@ def test_filters_combined_with_sync_attrs():
442442
diffs = de.get_attrs_diffs()
443443
if "+" in diffs:
444444
assert "tag" not in diffs["+"]
445+
446+
447+
# ---------------------------------------------------------------------------
448+
# sync_stages — ordered group execution for concurrent sync
449+
# ---------------------------------------------------------------------------
450+
451+
452+
# Models and adapters for sync_stages tests — uses multiple top-level types
453+
# to exercise staged parallelism.
454+
455+
_creation_order: List = []
456+
457+
458+
class _Region(DiffSyncModel):
459+
_modelname = "region"
460+
_identifiers = ("name",)
461+
_attributes = ("slug",)
462+
463+
name: str
464+
slug: str = ""
465+
466+
@classmethod
467+
def create(cls, adapter, ids, attrs):
468+
_creation_order.append(("region", ids["name"]))
469+
return super().create(adapter=adapter, ids=ids, attrs=attrs)
470+
471+
472+
class _Tenant(DiffSyncModel):
473+
_modelname = "tenant"
474+
_identifiers = ("name",)
475+
_attributes = ("group",)
476+
477+
name: str
478+
group: str = ""
479+
480+
@classmethod
481+
def create(cls, adapter, ids, attrs):
482+
_creation_order.append(("tenant", ids["name"]))
483+
return super().create(adapter=adapter, ids=ids, attrs=attrs)
484+
485+
486+
class _Rack(DiffSyncModel):
487+
_modelname = "rack"
488+
_identifiers = ("name",)
489+
_attributes = ("site_name",)
490+
491+
name: str
492+
site_name: str = ""
493+
494+
@classmethod
495+
def create(cls, adapter, ids, attrs):
496+
_creation_order.append(("rack", ids["name"]))
497+
return super().create(adapter=adapter, ids=ids, attrs=attrs)
498+
499+
500+
class _StagedAdapter(Adapter):
501+
region = _Region
502+
tenant = _Tenant
503+
rack = _Rack
504+
top_level = ["region", "tenant", "rack"]
505+
sync_stages = [
506+
["region", "tenant"], # stage 1: independent, can run in parallel
507+
["rack"], # stage 2: depends on regions being created
508+
]
509+
510+
511+
class _UnstagedAdapter(Adapter):
512+
"""Same models, no sync_stages — for comparison."""
513+
region = _Region
514+
tenant = _Tenant
515+
rack = _Rack
516+
top_level = ["region", "tenant", "rack"]
517+
518+
519+
def _make_staged_pair(adapter_cls=_StagedAdapter):
520+
"""Build a source with regions/tenants/racks and an empty destination."""
521+
src = adapter_cls()
522+
dst = adapter_cls()
523+
524+
src.add(_Region(name="region1", slug="r1"))
525+
src.add(_Region(name="region2", slug="r2"))
526+
src.add(_Tenant(name="tenant1", group="g1"))
527+
src.add(_Rack(name="rack1", site_name="region1"))
528+
src.add(_Rack(name="rack2", site_name="region2"))
529+
530+
return src, dst
531+
532+
533+
def test_sync_stages_executes_in_order():
534+
"""All stage-1 types (region, tenant) must be created before any stage-2 type (rack)."""
535+
_creation_order.clear()
536+
src, dst = _make_staged_pair()
537+
dst.sync_from(src, concurrent=True, max_workers=4)
538+
539+
# Find the index of the first rack creation
540+
rack_indices = [i for i, (t, _) in enumerate(_creation_order) if t == "rack"]
541+
region_indices = [i for i, (t, _) in enumerate(_creation_order) if t == "region"]
542+
tenant_indices = [i for i, (t, _) in enumerate(_creation_order) if t == "tenant"]
543+
544+
assert len(rack_indices) == 2
545+
assert len(region_indices) == 2
546+
assert len(tenant_indices) == 1
547+
548+
# All stage-1 creations (regions + tenants) must come before any stage-2 creation (racks)
549+
max_stage1_index = max(max(region_indices), max(tenant_indices))
550+
min_stage2_index = min(rack_indices)
551+
assert max_stage1_index < min_stage2_index, (
552+
f"Stage 1 items must all complete before stage 2 begins. "
553+
f"Order was: {_creation_order}"
554+
)
555+
556+
557+
def test_sync_stages_parallelizes_within_stage():
558+
"""Two independent top-level types in the same stage should both be processed."""
559+
_creation_order.clear()
560+
src, dst = _make_staged_pair()
561+
dst.sync_from(src, concurrent=True, max_workers=4)
562+
563+
types_created = {t for t, _ in _creation_order}
564+
assert "region" in types_created
565+
assert "tenant" in types_created
566+
assert "rack" in types_created
567+
568+
569+
def test_sync_stages_none_preserves_current_behavior():
570+
"""sync_stages=None with concurrent=True should behave like the original unstaged concurrent sync."""
571+
_creation_order.clear()
572+
src, dst = _make_staged_pair(_UnstagedAdapter)
573+
dst.sync_from(src, concurrent=True, max_workers=2)
574+
575+
assert dst.get_or_none("region", "region1") is not None
576+
assert dst.get_or_none("tenant", "tenant1") is not None
577+
assert dst.get_or_none("rack", "rack1") is not None
578+
579+
580+
def test_sync_stages_ignored_when_serial():
581+
"""sync_stages should have no effect on serial sync — top_level order is used."""
582+
_creation_order.clear()
583+
src, dst = _make_staged_pair()
584+
dst.sync_from(src, concurrent=False)
585+
586+
assert dst.get_or_none("region", "region1") is not None
587+
assert dst.get_or_none("rack", "rack1") is not None
588+
589+
590+
def test_sync_stages_validation_rejects_unknown_type():
591+
"""A type in sync_stages that is not in top_level should raise AttributeError."""
592+
import pytest
593+
594+
with pytest.raises(AttributeError, match="sync_stages.*not in top_level"):
595+
class _BadAdapter(Adapter):
596+
region = _Region
597+
top_level = ["region"]
598+
sync_stages = [["region", "nonexistent"]]
599+
600+
601+
def test_sync_stages_validation_rejects_duplicates():
602+
"""A type appearing in multiple stages should raise AttributeError."""
603+
import pytest
604+
605+
with pytest.raises(AttributeError, match="sync_stages.*duplicate"):
606+
class _BadAdapter(Adapter):
607+
region = _Region
608+
tenant = _Tenant
609+
top_level = ["region", "tenant"]
610+
sync_stages = [["region", "tenant"], ["region"]]
611+
612+
613+
def test_sync_stages_unstaged_types_still_sync():
614+
"""A type in top_level but not in any stage should still be synced (serially, after all stages)."""
615+
616+
class _PartialStagesAdapter(Adapter):
617+
region = _Region
618+
tenant = _Tenant
619+
rack = _Rack
620+
top_level = ["region", "tenant", "rack"]
621+
sync_stages = [["region"]] # tenant and rack not staged
622+
623+
_creation_order.clear()
624+
src, dst = _make_staged_pair(_PartialStagesAdapter)
625+
dst.sync_from(src, concurrent=True, max_workers=2)
626+
627+
# All types should still be synced
628+
assert dst.get_or_none("region", "region1") is not None
629+
assert dst.get_or_none("tenant", "tenant1") is not None
630+
assert dst.get_or_none("rack", "rack1") is not None

0 commit comments

Comments
 (0)