Skip to content

Commit 869754b

Browse files
authored
Guard cached get_offers with an execution lock (#3738)
1 parent 64bb005 commit 869754b

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

src/dstack/_internal/core/backends/aws/compute.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class AWSCompute(
119119
def __init__(
120120
self,
121121
config: AWSConfig,
122-
quotas_cache: Optional[AWSQuotasCache] = None,
122+
quotas_cache: Optional[ComputeTTLCache] = None,
123123
zones_cache: Optional[ComputeCache] = None,
124124
):
125125
super().__init__()
@@ -136,7 +136,7 @@ def __init__(
136136
# with more aggressive/longer caches.
137137
self._offers_post_filter_cache = ComputeTTLCache(cache=TTLCache(maxsize=10, ttl=180))
138138
if quotas_cache is None:
139-
quotas_cache = AWSQuotasCache(cache=TTLCache(maxsize=10, ttl=600))
139+
quotas_cache = ComputeTTLCache(cache=TTLCache(maxsize=10, ttl=600))
140140
self._regions_to_quotas_cache = quotas_cache
141141
if zones_cache is None:
142142
zones_cache = ComputeCache(cache=Cache(maxsize=10))
@@ -154,10 +154,7 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability
154154
extra_filter=_supported_instances,
155155
)
156156
regions = list(set(i.region for i in offers))
157-
with self._regions_to_quotas_cache.execution_lock:
158-
# Cache lock does not prevent concurrent execution.
159-
# We use a separate lock to avoid requesting quotas in parallel and hitting rate limits.
160-
regions_to_quotas = self._get_regions_to_quotas(self.session, regions)
157+
regions_to_quotas = self._get_regions_to_quotas(self.session, regions)
161158
regions_to_zones = self._get_regions_to_zones(self.session, regions)
162159

163160
availability_offers = []

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class ComputeWithAllOffersCached(ABC):
179179
def __init__(self) -> None:
180180
super().__init__()
181181
self._offers_cache_lock = threading.Lock()
182+
self._offers_cache_execution_lock = threading.Lock()
182183
self._offers_cache = TTLCache(maxsize=1, ttl=180)
183184

184185
@abstractmethod
@@ -206,7 +207,10 @@ def get_offers_post_filter(
206207
return None
207208

208209
def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
209-
cached_offers = self._get_all_offers_with_availability_cached()
210+
with self._offers_cache_execution_lock:
211+
# Cache lock does not prevent concurrent execution.
212+
# We use a separate lock to avoid requesting offers in parallel, re-doing the work and hitting rate limits.
213+
cached_offers = self._get_all_offers_with_availability_cached()
210214
offers = self.__apply_modifiers(cached_offers, self.get_offers_modifiers(requirements))
211215
offers = filter_offers_by_requirements(offers, requirements)
212216
post_filter = self.get_offers_post_filter(requirements)

0 commit comments

Comments
 (0)