Skip to content

Commit 0586b3d

Browse files
committed
kraken: defer state mutations until after image pull succeeds
1 parent 10ce693 commit 0586b3d

3 files changed

Lines changed: 27 additions & 32 deletions

File tree

core/services/kraken/api/v1/routers/extension.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
@extension_to_http_exception
2222
async def install_extension(body: ExtensionSource) -> StreamingResponse:
2323
extension = Extension(body)
24-
return StreamingResponse(streamer(extension.install(atomic=True)))
24+
return StreamingResponse(streamer(extension.install()))
2525

2626

2727
@extension_router_v1.post("/uninstall", status_code=status.HTTP_200_OK)

core/services/kraken/api/v2/routers/extension.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async def install(body: ExtensionSource) -> StreamingResponse:
8383
can install incompatible extensions. Make sure to check the extension source before installing it.
8484
"""
8585
extension = Extension(body)
86-
return StreamingResponse(streamer(extension.install(atomic=True)))
86+
return StreamingResponse(streamer(extension.install()))
8787

8888

8989
@extension_router_v2.post("/{identifier}/install", status_code=status.HTTP_201_CREATED)
@@ -149,13 +149,13 @@ async def update_to_latest(identifier: str, purge: bool = True, stable: bool = T
149149

150150
@extension_router_v2.put("/{identifier}/{tag}", status_code=status.HTTP_200_OK)
151151
@extension_to_http_exception
152-
async def update_to_tag(identifier: str, tag: str, purge: bool = True) -> Response:
152+
async def update_to_tag(identifier: str, tag: str, purge: bool = True, should_enable: bool = True) -> Response:
153153
"""
154154
Update a given extension by its identifier and tag to latest version on the higher priority manifest and by default
155155
purge all other tags, if purge is set to false it will keep all other versions disabled only.
156156
"""
157157
extension = cast(Extension, await Extension.from_manifest(identifier, tag))
158-
return StreamingResponse(streamer(extension.update(purge)))
158+
return StreamingResponse(streamer(extension.update(purge, should_enable)))
159159

160160

161161
@extension_router_v2.delete("/{identifier}", status_code=status.HTTP_202_ACCEPTED)
@@ -254,4 +254,4 @@ async def finalize_extension(
254254
new_extension = await Extension.finalize_temporary_extension(temp_extension, body.identifier, body)
255255

256256
# Install the extension
257-
return StreamingResponse(streamer(new_extension.install(atomic=True)))
257+
return StreamingResponse(streamer(new_extension.install()))

core/services/kraken/extension/extension.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -171,18 +171,17 @@ async def _disable_running_extension(self) -> Optional["Extension"]:
171171
except ExtensionNotRunning:
172172
return None
173173

174-
def _create_extension_settings(self) -> ExtensionSettings:
174+
def _create_extension_settings(self, should_enable: bool = True) -> ExtensionSettings:
175175
"""Create and save extension settings."""
176176
new_extension = ExtensionSettings(
177177
identifier=self.identifier,
178178
name=self.source.name,
179179
docker=self.source.docker,
180180
tag=self.tag,
181181
permissions=self.source.permissions,
182-
enabled=True,
182+
enabled=should_enable,
183183
user_permissions=self.source.user_permissions,
184184
)
185-
# Save in settings first, if the image fails to install it will try to fetch after in main kraken check loop
186185
self._save_settings(new_extension)
187186
return new_extension
188187

@@ -216,54 +215,50 @@ async def _pull_docker_image(self, docker_auth: Optional[str]) -> AsyncGenerator
216215
if self.digest:
217216
await client.images.tag(tag, f"{self.source.docker}:{self.tag}")
218217

218+
@staticmethod
219+
def _status_message(message: str) -> bytes:
220+
return json.dumps({"status": message}).encode("utf-8")
221+
219222
async def _clear_remaining_tags(self) -> None:
220223
"""Uninstall all other tags for this extension."""
221224
logger.info(f"Clearing remaining tags for extension {self.identifier}")
222225
to_clear: List[Extension] = cast(List[Extension], await self.from_settings(self.identifier))
223226
to_clear = [version for version in to_clear if version.source.tag != self.tag]
224227
await asyncio.gather(*(version.uninstall() for version in to_clear))
225228

226-
async def install(self, clear_remaining_tags: bool = True, atomic: bool = False) -> AsyncGenerator[bytes, None]:
229+
async def install(
230+
self, clear_remaining_tags: bool = True, should_enable: bool = True
231+
) -> AsyncGenerator[bytes, None]:
227232
logger.info(f"Installing extension {self.identifier}:{self.tag}")
228233

229-
# First we should make sure no other tag is running
230-
running_ext = await self._disable_running_extension()
231-
232-
self._create_extension_settings()
233234
try:
234235
self.lock(self.unique_entry)
235236

236237
docker_auth = self._prepare_docker_auth()
237238
async for line in self._pull_docker_image(docker_auth):
238239
yield line
239240
except Exception as error:
240-
# In case of some external installs kraken shouldn't try to install it again so we remove from settings
241-
if atomic:
242-
should_raise = False
243-
if await self._image_is_available_locally():
244-
logger.info(f"Pull failed but image {self.identifier}:{self.tag} is already available locally")
245-
else:
246-
if not running_ext or self.unique_entry != running_ext.unique_entry:
247-
should_raise = True
248-
await self.uninstall()
249-
if running_ext:
250-
await running_ext.enable()
251-
252-
if should_raise:
253-
raise ExtensionPullFailed(f"Failed to pull extension {self.identifier}:{self.tag}") from error
254-
# Reached only if the extensions are the same, the change is in permissions, not installation failure.
255-
return
241+
if await self._image_is_available_locally():
242+
logger.info(f"Pull failed but image {self.identifier}:{self.tag} is already available locally")
243+
else:
244+
raise ExtensionPullFailed(f"Failed to pull extension {self.identifier}:{self.tag}") from error
256245
finally:
257246
self.unlock(self.unique_entry)
258247
self.reset_start_attempt(self.unique_entry)
259248

260-
logger.info(f"Extension {self.identifier}:{self.tag} installed")
249+
await self._disable_running_extension()
250+
self._create_extension_settings(should_enable)
251+
yield self._status_message(f"Extension {self.identifier}:{self.tag} registered")
252+
261253
# Uninstall all other tags in case user wants to clear them
262254
if clear_remaining_tags:
263255
await self._clear_remaining_tags()
256+
yield self._status_message(f"Previous versions of {self.identifier} cleared")
257+
258+
logger.info(f"Extension {self.identifier}:{self.tag} installed")
264259

265-
async def update(self, clear_remaining_tags: bool) -> AsyncGenerator[bytes, None]:
266-
async for data in self.install(clear_remaining_tags):
260+
async def update(self, clear_remaining_tags: bool, should_enable: bool = True) -> AsyncGenerator[bytes, None]:
261+
async for data in self.install(clear_remaining_tags, should_enable=should_enable):
267262
yield data
268263

269264
async def uninstall(self) -> None:

0 commit comments

Comments
 (0)