From 70ee2c382b2ae5c461aea48d7433771724aff435 Mon Sep 17 00:00:00 2001 From: ryjiang Date: Thu, 7 May 2026 14:30:23 +0800 Subject: [PATCH] feat: add volume manager api Signed-off-by: ryjiang --- milvus/bulkwriter/VolumeManager.ts | 195 ++++++++++++++++++++++++++ milvus/bulkwriter/index.ts | 1 + milvus/types.ts | 3 +- milvus/types/Volume.ts | 45 ++++++ milvus/types/index.ts | 1 + test/bulkwriter/VolumeManager.spec.ts | 167 ++++++++++++++++++++++ 6 files changed, 411 insertions(+), 1 deletion(-) create mode 100644 milvus/bulkwriter/VolumeManager.ts create mode 100644 milvus/types/Volume.ts create mode 100644 test/bulkwriter/VolumeManager.spec.ts diff --git a/milvus/bulkwriter/VolumeManager.ts b/milvus/bulkwriter/VolumeManager.ts new file mode 100644 index 00000000..9e0d58a1 --- /dev/null +++ b/milvus/bulkwriter/VolumeManager.ts @@ -0,0 +1,195 @@ +import { + DEFAULT_HTTP_TIMEOUT, + VolumeApplyReq, + VolumeCreateReq, + VolumeListReq, + VolumeManagerConfig, + VolumeNameReq, + VolumeRequestOptions, + VolumeResponse, +} from '../'; + +export class VolumeManager { + private readonly cloudEndpoint: string; + private readonly apiKey: string; + private readonly fetchImpl: typeof fetch; + private readonly timeout: number; + + constructor(config: VolumeManagerConfig); + constructor(cloudEndpoint: string, apiKey: string, fetchImpl?: typeof fetch); + constructor( + configOrEndpoint: VolumeManagerConfig | string, + apiKey?: string, + fetchImpl?: typeof fetch + ) { + if (typeof configOrEndpoint === 'string') { + this.cloudEndpoint = configOrEndpoint; + this.apiKey = apiKey || ''; + this.fetchImpl = fetchImpl || fetch; + this.timeout = DEFAULT_HTTP_TIMEOUT; + } else { + this.cloudEndpoint = configOrEndpoint.cloudEndpoint; + this.apiKey = configOrEndpoint.apiKey; + this.fetchImpl = configOrEndpoint.fetch || fetch; + this.timeout = configOrEndpoint.timeout || DEFAULT_HTTP_TIMEOUT; + } + } + + async createVolume( + data: VolumeCreateReq, + options?: VolumeRequestOptions + ): Promise { + const { timeout, ...params } = data; + return this.POST('/v2/volumes/create', params, { + ...options, + timeout: options?.timeout ?? timeout ?? this.timeout, + }); + } + + async listVolumes( + data: VolumeListReq, + options?: VolumeRequestOptions + ): Promise { + const { timeout, ...params } = data; + return this.GET('/v2/volumes', params, { + ...options, + timeout: options?.timeout ?? timeout ?? this.timeout, + }); + } + + async deleteVolume( + data: VolumeNameReq, + options?: VolumeRequestOptions + ): Promise { + const { timeout, volumeName } = data; + return this.DELETE(`/v2/volumes/${encodeURIComponent(volumeName)}`, { + ...options, + timeout: options?.timeout ?? timeout ?? this.timeout, + }); + } + + async describeVolume( + data: VolumeNameReq, + options?: VolumeRequestOptions + ): Promise { + const { timeout, volumeName } = data; + return this.GET( + `/v2/volumes/${encodeURIComponent(volumeName)}`, + {}, + { + ...options, + timeout: options?.timeout ?? timeout ?? this.timeout, + } + ); + } + + async applyVolume( + data: VolumeApplyReq, + options?: VolumeRequestOptions + ): Promise { + const { timeout, ...params } = data; + return this.POST('/v2/volumes/apply', params, { + ...options, + timeout: options?.timeout ?? timeout ?? this.timeout, + }); + } + + private get headers() { + return { + Authorization: `Bearer ${this.apiKey}`, + Accept: 'application/json', + ContentType: 'application/json', + }; + } + + private get baseURL() { + return this.cloudEndpoint.replace(/\/+$/, ''); + } + + private async handleResponse(response: Response, url: string): Promise { + if (!response.ok) { + const errorText = await response.text().catch(() => ''); + throw new Error( + `HTTP ${response.status} ${response.statusText}: ${url}${ + errorText ? ` - ${errorText}` : '' + }` + ); + } + return response.json() as T; + } + + private async request( + method: 'GET' | 'POST' | 'DELETE', + path: string, + data?: Record, + options?: VolumeRequestOptions + ): Promise { + let id: ReturnType | undefined; + const timeout = options?.timeout ?? this.timeout; + const abortController = options?.abortController ?? new AbortController(); + + try { + id = setTimeout(() => abortController.abort(), timeout); + const url = this.buildUrl(path, method === 'GET' ? data : undefined); + const init: RequestInit = { + method, + headers: this.headers, + signal: abortController.signal, + }; + + if (method === 'POST') { + init.body = JSON.stringify(data || {}); + } + + const response = await this.fetchImpl(url, init); + return this.handleResponse(response, url); + } finally { + if (id !== undefined) { + clearTimeout(id); + } + } + } + + private buildUrl(path: string, params?: Record) { + const url = `${this.baseURL}${path}`; + if (!params || Object.keys(params).length === 0) { + return url; + } + + const searchParams = new URLSearchParams(); + Object.entries(params).forEach(([key, value]) => { + if (value !== undefined && value !== null) { + searchParams.append(key, String(value)); + } + }); + + const query = searchParams.toString(); + return query ? `${url}?${query}` : url; + } + + private GET( + path: string, + params?: Record, + options?: VolumeRequestOptions + ) { + return this.request('GET', path, params, options); + } + + private POST( + path: string, + data?: Record, + options?: VolumeRequestOptions + ) { + return this.request('POST', path, data, options); + } + + private DELETE(path: string, options?: VolumeRequestOptions) { + return this.request('DELETE', path, undefined, options); + } + + create_volume = this.createVolume; + list_volumes = this.listVolumes; + delete_volume = this.deleteVolume; + describe_volume = this.describeVolume; + apply_volume = this.applyVolume; +} diff --git a/milvus/bulkwriter/index.ts b/milvus/bulkwriter/index.ts index f2993ad6..dc59aecb 100644 --- a/milvus/bulkwriter/index.ts +++ b/milvus/bulkwriter/index.ts @@ -1,4 +1,5 @@ export { BulkWriter } from './BulkWriter'; +export { VolumeManager } from './VolumeManager'; export { ColumnBuffer } from './ColumnBuffer'; export { JsonFormatter } from './JsonFormatter'; export { ParquetFormatter } from './ParquetFormatter'; diff --git a/milvus/types.ts b/milvus/types.ts index 41483838..8f0bfab3 100644 --- a/milvus/types.ts +++ b/milvus/types.ts @@ -13,4 +13,5 @@ export * from './types/Http'; export * from './types/Segments'; export * from './types/Insert'; export * from './types/Search'; -export * from './types/DataTypes'; \ No newline at end of file +export * from './types/DataTypes'; +export * from './types/Volume'; diff --git a/milvus/types/Volume.ts b/milvus/types/Volume.ts new file mode 100644 index 00000000..1c72b986 --- /dev/null +++ b/milvus/types/Volume.ts @@ -0,0 +1,45 @@ +import { FetchOptions, HttpBaseResponse } from './Http'; + +export enum VolumeType { + MANAGED = 'MANAGED', + EXTERNAL = 'EXTERNAL', +} + +export interface VolumeBaseReq { + timeout?: number; +} + +export interface VolumeCreateReq extends VolumeBaseReq { + projectId: string; + regionId: string; + volumeName: string; + type?: VolumeType | keyof typeof VolumeType | string; + storageIntegrationId?: string; + path?: string; +} + +export interface VolumeListReq extends VolumeBaseReq { + projectId: string; + currentPage?: number; + pageSize?: number; + type?: VolumeType | keyof typeof VolumeType | string; +} + +export interface VolumeNameReq extends VolumeBaseReq { + volumeName: string; +} + +export interface VolumeApplyReq extends VolumeNameReq { + path: string; +} + +export type VolumeResponse> = HttpBaseResponse; + +export interface VolumeManagerConfig { + cloudEndpoint: string; + apiKey: string; + fetch?: typeof fetch; + timeout?: number; +} + +export interface VolumeRequestOptions extends Partial {} diff --git a/milvus/types/index.ts b/milvus/types/index.ts index ddc9e140..c63d5fb3 100644 --- a/milvus/types/index.ts +++ b/milvus/types/index.ts @@ -17,3 +17,4 @@ export * from './Insert'; export * from './Search'; export * from './DataTypes'; export * from './GlobalCluster'; +export * from './Volume'; diff --git a/test/bulkwriter/VolumeManager.spec.ts b/test/bulkwriter/VolumeManager.spec.ts new file mode 100644 index 00000000..86246e84 --- /dev/null +++ b/test/bulkwriter/VolumeManager.spec.ts @@ -0,0 +1,167 @@ +import { VolumeManager, VolumeType } from '../../milvus'; + +describe('VolumeManager', () => { + const cloudEndpoint = 'https://api.cloud.zilliz.com'; + const apiKey = 'api-key'; + + const createManager = () => { + const requests: { url: string; init: RequestInit }[] = []; + const fetch = jest + .fn() + .mockImplementation(async (url: string, init: any) => { + requests.push({ url, init }); + return { + ok: true, + json: async () => ({ code: 0, data: { volumeName: 'vol-1' } }), + }; + }); + + return { + manager: new VolumeManager({ + cloudEndpoint, + apiKey, + fetch: fetch as any, + }), + requests, + }; + }; + + it('should create a managed volume', async () => { + const { manager, requests } = createManager(); + + const res = await manager.createVolume({ + projectId: 'proj-xxx', + regionId: 'aws-us-west-2', + volumeName: 'managed-volume', + }); + + expect(res.code).toBe(0); + expect(requests[0].url).toBe(`${cloudEndpoint}/v2/volumes/create`); + expect(requests[0].init.method).toBe('POST'); + expect(requests[0].init.headers).toEqual({ + Authorization: `Bearer ${apiKey}`, + Accept: 'application/json', + ContentType: 'application/json', + }); + expect(JSON.parse(requests[0].init.body as string)).toEqual({ + projectId: 'proj-xxx', + regionId: 'aws-us-west-2', + volumeName: 'managed-volume', + }); + }); + + it('should create an external volume', async () => { + const { manager, requests } = createManager(); + + await manager.createVolume({ + projectId: 'proj-xxx', + regionId: 'aws-us-west-2', + volumeName: 'external-volume', + type: VolumeType.EXTERNAL, + storageIntegrationId: 'si-xxx', + path: 's3://bucket/path/', + }); + + expect(JSON.parse(requests[0].init.body as string)).toEqual({ + projectId: 'proj-xxx', + regionId: 'aws-us-west-2', + volumeName: 'external-volume', + type: VolumeType.EXTERNAL, + storageIntegrationId: 'si-xxx', + path: 's3://bucket/path/', + }); + }); + + it('should list volumes with type filter', async () => { + const { manager, requests } = createManager(); + + await manager.listVolumes({ + projectId: 'proj-xxx', + currentPage: 2, + pageSize: 20, + type: VolumeType.EXTERNAL, + }); + + expect(requests[0].url).toBe( + `${cloudEndpoint}/v2/volumes?projectId=proj-xxx¤tPage=2&pageSize=20&type=EXTERNAL` + ); + expect(requests[0].init.method).toBe('GET'); + }); + + it('should describe a volume', async () => { + const { manager, requests } = createManager(); + + await manager.describeVolume({ volumeName: 'volume/name' }); + + expect(requests[0].url).toBe(`${cloudEndpoint}/v2/volumes/volume%2Fname`); + expect(requests[0].init.method).toBe('GET'); + }); + + it('should delete a volume', async () => { + const { manager, requests } = createManager(); + + await manager.deleteVolume({ volumeName: 'volume/name' }); + + expect(requests[0].url).toBe(`${cloudEndpoint}/v2/volumes/volume%2Fname`); + expect(requests[0].init.method).toBe('DELETE'); + }); + + it('should apply a volume', async () => { + const { manager, requests } = createManager(); + + await manager.applyVolume({ volumeName: 'volume-name', path: '/data/' }); + + expect(requests[0].url).toBe(`${cloudEndpoint}/v2/volumes/apply`); + expect(requests[0].init.method).toBe('POST'); + expect(JSON.parse(requests[0].init.body as string)).toEqual({ + volumeName: 'volume-name', + path: '/data/', + }); + }); + + it('should support constructor overload with endpoint and api key', async () => { + const requests: { url: string; init: RequestInit }[] = []; + const fetch = jest + .fn() + .mockImplementation(async (url: string, init: any) => { + requests.push({ url, init }); + return { + ok: true, + json: async () => ({ code: 0, data: {} }), + }; + }); + const manager = new VolumeManager( + `${cloudEndpoint}/`, + apiKey, + fetch as any + ); + + await manager.create_volume({ + projectId: 'proj-xxx', + regionId: 'aws-us-west-2', + volumeName: 'managed-volume', + }); + + expect(requests[0].url).toBe(`${cloudEndpoint}/v2/volumes/create`); + }); + + it('should throw for non-ok HTTP responses with response text', async () => { + const fetch = jest.fn().mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + text: async () => 'boom', + }); + const manager = new VolumeManager({ + cloudEndpoint, + apiKey, + fetch: fetch as any, + }); + + await expect( + manager.describe_volume({ volumeName: 'volume-name' }) + ).rejects.toThrow( + `HTTP 500 Internal Server Error: ${cloudEndpoint}/v2/volumes/volume-name - boom` + ); + }); +});