Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AWSCompute(
def __init__(
self,
config: AWSConfig,
quotas_cache: Optional[AWSQuotasCache] = None,
quotas_cache: Optional[ComputeTTLCache] = None,
zones_cache: Optional[ComputeCache] = None,
):
super().__init__()
Expand All @@ -136,7 +136,7 @@ def __init__(
# with more aggressive/longer caches.
self._offers_post_filter_cache = ComputeTTLCache(cache=TTLCache(maxsize=10, ttl=180))
if quotas_cache is None:
quotas_cache = AWSQuotasCache(cache=TTLCache(maxsize=10, ttl=600))
quotas_cache = ComputeTTLCache(cache=TTLCache(maxsize=10, ttl=600))
self._regions_to_quotas_cache = quotas_cache
if zones_cache is None:
zones_cache = ComputeCache(cache=Cache(maxsize=10))
Expand All @@ -154,10 +154,7 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability
extra_filter=_supported_instances,
)
regions = list(set(i.region for i in offers))
with self._regions_to_quotas_cache.execution_lock:
# Cache lock does not prevent concurrent execution.
# We use a separate lock to avoid requesting quotas in parallel and hitting rate limits.
regions_to_quotas = self._get_regions_to_quotas(self.session, regions)
regions_to_quotas = self._get_regions_to_quotas(self.session, regions)
regions_to_zones = self._get_regions_to_zones(self.session, regions)

availability_offers = []
Expand Down
6 changes: 5 additions & 1 deletion src/dstack/_internal/core/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class ComputeWithAllOffersCached(ABC):
def __init__(self) -> None:
super().__init__()
self._offers_cache_lock = threading.Lock()
self._offers_cache_execution_lock = threading.Lock()
self._offers_cache = TTLCache(maxsize=1, ttl=180)

@abstractmethod
Expand Down Expand Up @@ -206,7 +207,10 @@ def get_offers_post_filter(
return None

def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
cached_offers = self._get_all_offers_with_availability_cached()
with self._offers_cache_execution_lock:
# Cache lock does not prevent concurrent execution.
# We use a separate lock to avoid requesting offers in parallel, re-doing the work and hitting rate limits.
cached_offers = self._get_all_offers_with_availability_cached()
offers = self.__apply_modifiers(cached_offers, self.get_offers_modifiers(requirements))
offers = filter_offers_by_requirements(offers, requirements)
post_filter = self.get_offers_post_filter(requirements)
Expand Down
Loading