Skip to content

Commit 86f9d8d

Browse files
committed
frontend: kraken: move extension endpoints to zenoh
1 parent a60ae00 commit 86f9d8d

2 files changed

Lines changed: 175 additions & 115 deletions

File tree

core/frontend/src/components/kraken/KrakenManager.ts

Lines changed: 141 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'
2+
13
import zenoh from '@/libs/zenoh'
24
import {
35
ExtensionData,
@@ -9,25 +11,13 @@ import {
911
UploadProgressEvent,
1012
} from '@/types/kraken'
1113
import back_axios from '@/utils/api'
12-
import { QueryTarget, Sample, Subscriber } from '@eclipse-zenoh/zenoh-ts'
14+
import { createDeferred } from '@/utils/deferred'
1315

1416
const KRAKEN_BASE_URL = '/kraken'
1517
const KRAKEN_API_V2_URL = `${KRAKEN_BASE_URL}/v2.0`
1618
const KRAKEN_BASE_ZENOH = 'kraken'
19+
const INSTALL_PROGRESS_TOPIC = `${KRAKEN_BASE_ZENOH}/extension/install/progress`
1720

18-
/**
19-
* List details of all installed extensions.
20-
* @returns {Promise<InstalledExtensionData[]>}
21-
*/
22-
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[]> {
23-
const response = await back_axios({
24-
method: 'get',
25-
url: `${KRAKEN_API_V2_URL}/extension/`,
26-
timeout: 10000,
27-
})
28-
29-
return response.data as InstalledExtensionData[]
30-
}
3121

3222
/**
3323
* List all manifest sources from kraken, uses API v2
@@ -189,78 +179,160 @@ export async function setManifestSourceOrder(identifier: string, order: number):
189179
})
190180
}
191181

182+
function buildInstallQueryKey(identifier: string, tag: string | undefined, stable: boolean): string {
183+
let key = `${KRAKEN_BASE_ZENOH}/extension/install?identifier=${encodeURIComponent(identifier)}`
184+
if (tag) key += `;tag=${encodeURIComponent(tag)}`
185+
if (!stable) key += ';stable=false'
186+
return key
187+
}
188+
189+
type InstallSample =
190+
| { kind: 'error'; message: string }
191+
| { kind: 'complete' }
192+
| { kind: 'progress'; raw: string }
193+
| null
194+
195+
function parseInstallSample(raw: string, identifier: string): InstallSample {
196+
let data: { identifier?: string; status?: string; error?: string }
197+
try {
198+
data = JSON.parse(raw)
199+
} catch {
200+
return null
201+
}
202+
if (data.identifier !== identifier) return null
203+
if (data.error) return { kind: 'error', message: data.error }
204+
if (data.status === 'complete') return { kind: 'complete' }
205+
return { kind: 'progress', raw }
206+
}
207+
192208
/**
193-
* Install an extension to the latest version available
194-
* @param {InstalledExtensionData} extension The extension to be installed
209+
* Install an extension to the latest version available.
210+
* The backend publishes the pull progress on `INSTALL_PROGRESS_TOPIC`.
211+
*
212+
* @param {string} identifier The identifier of the extension
195213
* @param {function} progressHandler The progress handler for the download
214+
* @param {string} tag The tag of the extension
215+
* @param {boolean} stable If true, will install the latest stable version, default is true
216+
* @param {number} timeout The timeout for the install
196217
*/
197218
export async function installExtension(
198-
extension: InstalledExtensionData,
199-
progressHandler: (event: any) => void,
219+
identifier: string,
220+
progressHandler?: (fragment: string) => void,
221+
tag?: string,
222+
stable = true,
223+
timeout = 600000,
200224
): Promise<void> {
201-
await back_axios({
202-
url: `${KRAKEN_API_V2_URL}/extension/install`,
203-
method: 'POST',
204-
data: {
205-
identifier: extension.identifier,
206-
name: extension.name,
207-
docker: extension.docker,
208-
tag: extension.tag,
209-
enabled: true,
210-
permissions: extension?.permissions ?? '',
211-
user_permissions: extension?.user_permissions ?? '',
212-
},
213-
timeout: 600000,
214-
onDownloadProgress: progressHandler,
215-
})
225+
const deferred = createDeferred<void>()
226+
let subscriber: Subscriber | null = null
227+
let timer: ReturnType<typeof setTimeout> | null = null
228+
229+
async function cleanup(): Promise<void> {
230+
if (timer !== null) {
231+
clearTimeout(timer)
232+
timer = null
233+
}
234+
try {
235+
await subscriber?.undeclare()
236+
} catch {
237+
// The subscriber may already be gone. Ignore cleanup errors.
238+
}
239+
subscriber = null
240+
}
241+
242+
async function handleSample(sample: Sample): Promise<void> {
243+
const result = parseInstallSample(sample.payload().to_string(), identifier)
244+
if (result === null) return
245+
switch (result.kind) {
246+
case 'error':
247+
cleanup().finally(() => deferred.reject(new Error(result.message)))
248+
break
249+
case 'complete':
250+
cleanup().finally(() => deferred.resolve())
251+
break
252+
case 'progress':
253+
progressHandler?.(result.raw)
254+
break
255+
default:
256+
break
257+
}
258+
}
259+
260+
// Subscribe before triggering the install.
261+
subscriber = await zenoh.subscriber(INSTALL_PROGRESS_TOPIC, handleSample)
262+
if (!subscriber) {
263+
throw new Error('Failed to subscribe to install progress topic')
264+
}
265+
timer = setTimeout(
266+
() => cleanup().finally(() => deferred.reject(new Error(`Install timed out after ${timeout}ms`))),
267+
timeout,
268+
)
269+
270+
try {
271+
const reply = await zenoh.query(
272+
buildInstallQueryKey(identifier, tag, stable),
273+
QueryTarget.BestMatching,
274+
timeout,
275+
)
276+
if (!reply || reply.error) {
277+
throw new Error(reply?.error ?? 'Install query failed')
278+
}
279+
} catch (error) {
280+
await cleanup()
281+
throw error
282+
}
283+
284+
return deferred.promise
216285
}
217286

218287
/**
219-
* Enable an extension by its identifier and tag, uses API v2
288+
* Enable an extension by its identifier and tag, uses zenoh
220289
* @param {string} identifier The identifier of the extension
221290
* @param {string} tag The tag of the extension
222291
*/
223292
export async function enableExtension(identifier: string, tag: string): Promise<void> {
224-
await back_axios({
225-
method: 'POST',
226-
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}/enable`,
227-
timeout: 10000,
228-
})
293+
await zenoh.query(
294+
`${KRAKEN_BASE_ZENOH}/extension/enable?identifier=${identifier};tag=${tag}`,
295+
QueryTarget.BestMatching,
296+
10000,
297+
)
229298
}
230299

231300
/**
232-
* Disable an extension by its identifier, uses API v2
301+
* Disable an extension by its identifier, uses zenoh
233302
* @param {string} identifier The identifier of the extension
234303
*/
235304
export async function disableExtension(identifier: string): Promise<void> {
236-
await back_axios({
237-
method: 'POST',
238-
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/disable`,
239-
timeout: 10000,
240-
})
305+
await zenoh.query(
306+
`${KRAKEN_BASE_ZENOH}/extension/disable?identifier=${identifier}`,
307+
QueryTarget.BestMatching,
308+
10000,
309+
)
241310
}
242311

243312
/**
244-
* Uninstall an extension by its identifier, uses API v2
313+
* Uninstall an extension by its identifier, uses zenoh
245314
* @param {string} identifier The identifier of the extension
246315
*/
247-
export async function uninstallExtension(identifier: string): Promise<void> {
248-
await back_axios({
249-
method: 'DELETE',
250-
url: `${KRAKEN_API_V2_URL}/extension/${identifier}`,
251-
})
316+
export async function uninstallExtension(identifier: string, tag?: string): Promise<void> {
317+
let queryKey = `${KRAKEN_BASE_ZENOH}/extension/uninstall?identifier=${identifier}`
318+
if (tag) queryKey += `;tag=${tag}`
319+
320+
await zenoh.query(
321+
queryKey,
322+
QueryTarget.BestMatching,
323+
)
252324
}
253325

254326
/**
255-
* Restart an extension by its identifier, uses API v2
327+
* Restart an extension by its identifier, uses zenoh
256328
* @param {string} identifier The identifier of the extension
257329
*/
258330
export async function restartExtension(identifier: string): Promise<void> {
259-
await back_axios({
260-
method: 'POST',
261-
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/restart`,
262-
timeout: 10000,
263-
})
331+
await zenoh.query(
332+
`${KRAKEN_BASE_ZENOH}/extension/restart?identifier=${identifier}`,
333+
QueryTarget.BestMatching,
334+
10000,
335+
)
264336
}
265337

266338
/**
@@ -283,16 +355,15 @@ export async function updateExtensionToVersion(
283355
}
284356

285357
/**
286-
* List all installed extensions from kraken, uses API v2
358+
* List details of all installed extensions.
359+
* @returns {Promise<InstalledExtensionData[]> | null}
287360
*/
288-
export async function getInstalledExtensions(): Promise<InstalledExtensionData[]> {
289-
const response = await back_axios({
290-
method: 'GET',
291-
url: `${KRAKEN_API_V2_URL}/extension/`,
292-
timeout: 30000,
293-
})
294-
295-
return response.data as InstalledExtensionData[]
361+
export async function fetchInstalledExtensions(): Promise<InstalledExtensionData[] | null> {
362+
return zenoh.query(
363+
`${KRAKEN_BASE_ZENOH}/extension/fetch`,
364+
QueryTarget.BestMatching,
365+
30000,
366+
)
296367
}
297368

298369
/**
@@ -349,11 +420,10 @@ export async function uploadExtensionTarFile(
349420
* @returns {Promise<void>}
350421
*/
351422
export async function keepTemporaryExtensionAlive(tempTag: string): Promise<void> {
352-
await back_axios({
353-
method: 'POST',
354-
url: `${KRAKEN_API_V2_URL}/extension/upload/keep-alive?temp_tag=${tempTag}`,
355-
timeout: 10000,
356-
})
423+
await zenoh.query(
424+
`${KRAKEN_BASE_ZENOH}/extension/upload/keep-alive?temp_tag=${tempTag}`,
425+
QueryTarget.BestMatching,
426+
)
357427
}
358428

359429
/**
@@ -421,9 +491,7 @@ export default {
421491
disabledManifestSource,
422492
setManifestSourcesOrders,
423493
setManifestSourceOrder,
424-
updateExtensionToVersion,
425494
installExtension,
426-
getInstalledExtensions,
427495
enableExtension,
428496
disableExtension,
429497
uninstallExtension,

0 commit comments

Comments
 (0)