From 911d3be8c160a081be8d96250759d5ca94909eb0 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Thu, 29 Jan 2026 11:44:13 +0530 Subject: [PATCH 1/8] feat(mongodb): implement complete MongoDB memory adapter --- .changeset/mongodb-feature-storage.md | 5 + packages/mongodb/README.md | 40 + packages/mongodb/docker-compose.test.yaml | 18 + packages/mongodb/package.json | 55 + .../mongodb/src/index.integration.test.ts | 651 ++++++++++++ packages/mongodb/src/index.ts | 2 + packages/mongodb/src/memory-adapter.spec.ts | 976 +++++++++++++++++ packages/mongodb/src/memory-adapter.ts | 977 ++++++++++++++++++ packages/mongodb/tsconfig.json | 30 + packages/mongodb/tsup.config.ts | 19 + packages/mongodb/vitest.config.mts | 10 + .../mongodb/vitest.integration.config.mts | 10 + pnpm-lock.yaml | 103 +- 13 files changed, 2895 insertions(+), 1 deletion(-) create mode 100644 .changeset/mongodb-feature-storage.md create mode 100644 packages/mongodb/README.md create mode 100644 packages/mongodb/docker-compose.test.yaml create mode 100644 packages/mongodb/package.json create mode 100644 packages/mongodb/src/index.integration.test.ts create mode 100644 packages/mongodb/src/index.ts create mode 100644 packages/mongodb/src/memory-adapter.spec.ts create mode 100644 packages/mongodb/src/memory-adapter.ts create mode 100644 packages/mongodb/tsconfig.json create mode 100644 packages/mongodb/tsup.config.ts create mode 100644 packages/mongodb/vitest.config.mts create mode 100644 packages/mongodb/vitest.integration.config.mts diff --git a/.changeset/mongodb-feature-storage.md b/.changeset/mongodb-feature-storage.md new file mode 100644 index 000000000..59baf61ea --- /dev/null +++ b/.changeset/mongodb-feature-storage.md @@ -0,0 +1,5 @@ +--- +"@voltagent/mongodb": minor +--- + +Initial release of MongoDB memory storage adapter diff --git a/packages/mongodb/README.md b/packages/mongodb/README.md new file mode 100644 index 000000000..14232061e --- /dev/null +++ b/packages/mongodb/README.md @@ -0,0 +1,40 @@ +# @voltagent/mongodb + +MongoDB storage adapter for VoltAgent memory. + +## Installation + +```bash +npm install @voltagent/mongodb +``` + +## Usage + +```typescript +import { MongoDBMemoryAdapter } from "@voltagent/mongodb"; +import { Memory } from "@voltagent/core"; + +const memory = new Memory({ + storage: new MongoDBMemoryAdapter({ + connection: process.env.MONGO_URI, + database: "voltagent", // optional + collectionPrefix: "voltagent_memory", // optional + }), +}); +``` + +## Features + +- **Persistent Storage**: Stores messages, conversations, and workflow states in MongoDB. +- **Efficient Queries**: Indexed for fast retrieval by user, conversation, or date. +- **Type Safe**: Fully typed implementation of the VoltAgent StorageAdapter interface. +- **Workflow Support**: Native support for resuming suspended workflows. + +## Configuration + +| Option | Type | Default | Description | +| str | str | str | str | +| `connection` | `string` | required | MongoDB connection URI | +| `database` | `string` | `"voltagent"` | Database name | +| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | +| `debug` | `boolean` | `false` | Enable debug logging | diff --git a/packages/mongodb/docker-compose.test.yaml b/packages/mongodb/docker-compose.test.yaml new file mode 100644 index 000000000..0f5400cae --- /dev/null +++ b/packages/mongodb/docker-compose.test.yaml @@ -0,0 +1,18 @@ +services: + mongodb-test: + image: mongo:7.0 + container_name: 'voltagent-mongodb-test' + ports: + - '27017:27017' + environment: + MONGO_INITDB_DATABASE: voltagent_test + volumes: + - test_mongodb_data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 2s + timeout: 5s + retries: 10 + +volumes: + test_mongodb_data: diff --git a/packages/mongodb/package.json b/packages/mongodb/package.json new file mode 100644 index 000000000..1524a7f57 --- /dev/null +++ b/packages/mongodb/package.json @@ -0,0 +1,55 @@ +{ + "name": "@voltagent/mongodb", + "description": "VoltAgent MongoDB - MongoDB Memory provider integration for VoltAgent", + "version": "2.0.2", + "dependencies": { + "mongodb": "^7.0.0" + }, + "devDependencies": { + "@vitest/coverage-v8": "^3.2.4", + "@voltagent/core": "^2.0.2", + "ai": "^6.0.0" + }, + "exports": { + ".": { + "import": { + "types": "./dist/index.d.mts", + "default": "./dist/index.mjs" + }, + "require": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + } + }, + "files": [ + "dist" + ], + "license": "MIT", + "main": "dist/index.js", + "module": "dist/index.mjs", + "peerDependencies": { + "@voltagent/core": "^2.0.0", + "ai": "^6.0.0" + }, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "packages/mongodb" + }, + "scripts": { + "attw": "attw --pack", + "build": "tsup", + "dev": "tsup --watch", + "lint": "biome check .", + "lint:fix": "biome check . --write", + "publint": "publint --strict", + "test": "vitest", + "test:coverage": "vitest run --coverage", + "test:integration": "npm run test:integration:setup && vitest run --config vitest.integration.config.mts && npm run test:integration:teardown", + "test:integration:ci": "vitest run --config vitest.integration.config.mts", + "test:integration:setup": "docker compose -f docker-compose.test.yaml up -d && sleep 10", + "test:integration:teardown": "docker compose -f docker-compose.test.yaml down -v" + }, + "types": "dist/index.d.ts" +} diff --git a/packages/mongodb/src/index.integration.test.ts b/packages/mongodb/src/index.integration.test.ts new file mode 100644 index 000000000..9991af3aa --- /dev/null +++ b/packages/mongodb/src/index.integration.test.ts @@ -0,0 +1,651 @@ +/** + * Integration tests for MongoDB Memory Storage Adapter + * Tests against real MongoDB instance running in Docker + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { MongoDBMemoryAdapter } from "./memory-adapter"; + +describe("MongoDBMemoryAdapter - Integration Tests", () => { + let adapter: MongoDBMemoryAdapter; + + const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017"; + const TEST_DATABASE = "voltagent_test"; + + beforeAll(async () => { + // Create adapter with test database + adapter = new MongoDBMemoryAdapter({ + connection: MONGO_URI, + database: TEST_DATABASE, + collectionPrefix: "test_memory", + debug: false, + }); + + // Wait for initialization + await new Promise((resolve) => setTimeout(resolve, 1000)); + }); + + beforeEach(async () => { + // Clean up all collections before each test + const { MongoClient } = await import("mongodb"); + const client = new MongoClient(MONGO_URI); + await client.connect(); + const db = client.db(TEST_DATABASE); + + const collections = await db.listCollections().toArray(); + for (const collection of collections) { + if (collection.name.startsWith("test_memory_")) { + await db.collection(collection.name).deleteMany({}); + } + } + + await client.close(); + }); + + afterAll(async () => { + // Close adapter connection + await adapter.close(); + }); + + // ============================================================================ + // Message Operations Integration Tests + // ============================================================================ + + describe("Message Operations", () => { + it("should add and retrieve messages", async () => { + // Create conversation first + const conversation = await adapter.createConversation({ + id: "conv-1", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: {}, + }); + + expect(conversation.id).toBe("conv-1"); + + // Add message + const message: UIMessage = { + id: "msg-1", + role: "user", + parts: [{ type: "text", text: "Hello, world!" }], + metadata: { custom: "data" }, + }; + + await adapter.addMessage(message, "user-1", "conv-1"); + + // Retrieve messages + const messages = await adapter.getMessages("user-1", "conv-1"); + + expect(messages).toHaveLength(1); + expect(messages[0].id).toBe("msg-1"); + expect(messages[0].role).toBe("user"); + expect(messages[0].parts).toEqual(message.parts); + expect(messages[0].metadata?.createdAt).toBeInstanceOf(Date); + }); + + it("should add multiple messages in batch", async () => { + await adapter.createConversation({ + id: "conv-2", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const messages: UIMessage[] = [ + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + { id: "msg-2", role: "assistant", parts: [{ type: "text", text: "Hi there!" }] }, + { id: "msg-3", role: "user", parts: [{ type: "text", text: "How are you?" }] }, + ]; + + await adapter.addMessages(messages, "user-1", "conv-2"); + + const retrieved = await adapter.getMessages("user-1", "conv-2"); + + expect(retrieved).toHaveLength(3); + expect(retrieved.map((m) => m.id)).toEqual(["msg-1", "msg-2", "msg-3"]); + }); + + it("should filter messages by role", async () => { + await adapter.createConversation({ + id: "conv-3", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessages( + [ + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + { id: "msg-2", role: "assistant", parts: [{ type: "text", text: "Hi" }] }, + { id: "msg-3", role: "user", parts: [{ type: "text", text: "How are you?" }] }, + ], + "user-1", + "conv-3", + ); + + const userMessages = await adapter.getMessages("user-1", "conv-3", { roles: ["user"] }); + + expect(userMessages).toHaveLength(2); + expect(userMessages.every((m) => m.role === "user")).toBe(true); + }); + + it("should clear messages for a conversation", async () => { + await adapter.createConversation({ + id: "conv-4", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessage( + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + "user-1", + "conv-4", + ); + + await adapter.clearMessages("user-1", "conv-4"); + + const messages = await adapter.getMessages("user-1", "conv-4"); + expect(messages).toHaveLength(0); + }); + + it("should throw error when adding message to non-existent conversation", async () => { + const message: UIMessage = { + id: "msg-1", + role: "user", + parts: [{ type: "text", text: "Hello" }], + }; + + await expect(adapter.addMessage(message, "user-1", "non-existent")).rejects.toThrow( + ConversationNotFoundError, + ); + }); + }); + + // ============================================================================ + // Conversation Operations Integration Tests + // ============================================================================ + + describe("Conversation Operations", () => { + it("should create and retrieve conversation", async () => { + const input = { + id: "conv-create-test", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: { custom: "field" }, + }; + + const created = await adapter.createConversation(input); + + expect(created.id).toBe(input.id); + expect(created.title).toBe(input.title); + expect(created.metadata).toEqual(input.metadata); + expect(created.createdAt).toBeTypeOf("string"); + expect(created.updatedAt).toBeTypeOf("string"); + + const retrieved = await adapter.getConversation("conv-create-test"); + + expect(retrieved).not.toBeNull(); + expect(retrieved?.id).toBe(input.id); + }); + + it("should throw error when creating duplicate conversation", async () => { + const input = { + id: "conv-duplicate", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }; + + await adapter.createConversation(input); + + await expect(adapter.createConversation(input)).rejects.toThrow( + ConversationAlreadyExistsError, + ); + }); + + it("should update conversation", async () => { + await adapter.createConversation({ + id: "conv-update", + resourceId: "resource-1", + userId: "user-1", + title: "Original Title", + metadata: {}, + }); + + const updated = await adapter.updateConversation("conv-update", { + title: "Updated Title", + metadata: { updated: true }, + }); + + expect(updated.title).toBe("Updated Title"); + expect(updated.metadata.updated).toBe(true); + }); + + it("should delete conversation and cascade to messages", async () => { + await adapter.createConversation({ + id: "conv-delete", + resourceId: "resource-1", + userId: "user-1", + title: "To Delete", + metadata: {}, + }); + + await adapter.addMessage( + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + "user-1", + "conv-delete", + ); + + await adapter.deleteConversation("conv-delete"); + + const conversation = await adapter.getConversation("conv-delete"); + expect(conversation).toBeNull(); + + const messages = await adapter.getMessages("user-1", "conv-delete"); + expect(messages).toHaveLength(0); + }); + + it("should query conversations with pagination", async () => { + // Create multiple conversations + for (let i = 1; i <= 5; i++) { + await adapter.createConversation({ + id: `conv-query-${i}`, + resourceId: "resource-1", + userId: "user-1", + title: `Conversation ${i}`, + metadata: {}, + }); + // Small delay to ensure different timestamps + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + const page1 = await adapter.queryConversations({ + userId: "user-1", + limit: 2, + offset: 0, + }); + + expect(page1).toHaveLength(2); + + const page2 = await adapter.queryConversations({ + userId: "user-1", + limit: 2, + offset: 2, + }); + + expect(page2).toHaveLength(2); + expect(page1[0].id).not.toBe(page2[0].id); + }); + + it("should get conversations by resourceId", async () => { + await adapter.createConversation({ + id: "conv-res-1", + resourceId: "resource-test", + userId: "user-1", + title: "Test 1", + metadata: {}, + }); + + await adapter.createConversation({ + id: "conv-res-2", + resourceId: "resource-test", + userId: "user-2", + title: "Test 2", + metadata: {}, + }); + + const conversations = await adapter.getConversations("resource-test"); + + expect(conversations).toHaveLength(2); + }); + }); + + // ============================================================================ + // Working Memory Integration Tests + // ============================================================================ + + describe("Working Memory Operations", () => { + it("should set and get conversation-scoped working memory", async () => { + await adapter.createConversation({ + id: "conv-memory", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.setWorkingMemory({ + conversationId: "conv-memory", + content: "Important context about this conversation", + scope: "conversation", + }); + + const memory = await adapter.getWorkingMemory({ + conversationId: "conv-memory", + scope: "conversation", + }); + + expect(memory).toBe("Important context about this conversation"); + }); + + it("should set and get user-scoped working memory", async () => { + await adapter.setWorkingMemory({ + userId: "user-memory-test", + content: "User preferences and context", + scope: "user", + }); + + const memory = await adapter.getWorkingMemory({ + userId: "user-memory-test", + scope: "user", + }); + + expect(memory).toBe("User preferences and context"); + }); + + it("should delete working memory", async () => { + await adapter.createConversation({ + id: "conv-del-memory", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.setWorkingMemory({ + conversationId: "conv-del-memory", + content: "Test memory", + scope: "conversation", + }); + + await adapter.deleteWorkingMemory({ + conversationId: "conv-del-memory", + scope: "conversation", + }); + + const memory = await adapter.getWorkingMemory({ + conversationId: "conv-del-memory", + scope: "conversation", + }); + + expect(memory).toBeNull(); + }); + }); + + // ============================================================================ + // Workflow State Integration Tests + // ============================================================================ + + describe("Workflow State Operations", () => { + it("should set and get workflow state", async () => { + const state: any = { + id: "exec-1", + workflowId: "workflow-1", + workflowName: "Test Workflow", + status: "running", + suspension: null, + events: [], + output: null, + cancellation: null, + userId: "user-1", + conversationId: "conv-1", + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await adapter.setWorkflowState("exec-1", state); + + const retrieved = await adapter.getWorkflowState("exec-1"); + + expect(retrieved).not.toBeNull(); + expect(retrieved?.id).toBe("exec-1"); + expect(retrieved?.status).toBe("running"); + }); + + it("should update workflow state", async () => { + const state: any = { + id: "exec-update", + workflowId: "workflow-1", + workflowName: "Test", + status: "running", + createdAt: new Date(), + updatedAt: new Date(), + }; + + await adapter.setWorkflowState("exec-update", state); + + await adapter.updateWorkflowState("exec-update", { + status: "completed", + output: { result: "success" }, + }); + + const updated = await adapter.getWorkflowState("exec-update"); + + expect(updated?.status).toBe("completed"); + expect(updated?.output).toEqual({ result: "success" }); + }); + + it("should query workflow runs", async () => { + const now = new Date(); + + for (let i = 1; i <= 3; i++) { + await adapter.setWorkflowState(`exec-query-${i}`, { + id: `exec-query-${i}`, + workflowId: "workflow-query", + workflowName: "Test", + status: i === 1 ? "completed" : "running", + createdAt: now, + updatedAt: now, + } as any); + } + + const allRuns = await adapter.queryWorkflowRuns({ + workflowId: "workflow-query", + }); + + expect(allRuns.length).toBeGreaterThanOrEqual(3); + + const completedRuns = await adapter.queryWorkflowRuns({ + workflowId: "workflow-query", + status: "completed", + }); + + expect(completedRuns).toHaveLength(1); + }); + + it("should get suspended workflow states", async () => { + const now = new Date(); + + await adapter.setWorkflowState("exec-suspended-1", { + id: "exec-suspended-1", + workflowId: "workflow-suspend", + workflowName: "Test", + status: "suspended", + createdAt: now, + updatedAt: now, + } as any); + + await adapter.setWorkflowState("exec-running-1", { + id: "exec-running-1", + workflowId: "workflow-suspend", + workflowName: "Test", + status: "running", + createdAt: now, + updatedAt: now, + } as any); + + const suspended = await adapter.getSuspendedWorkflowStates("workflow-suspend"); + + expect(suspended).toHaveLength(1); + expect(suspended[0].status).toBe("suspended"); + }); + }); + + // ============================================================================ + // Conversation Steps Integration Tests + // ============================================================================ + + describe("Conversation Steps Operations", () => { + it("should save and retrieve conversation steps", async () => { + await adapter.createConversation({ + id: "conv-steps", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const steps: any[] = [ + { + id: "step-1", + conversationId: "conv-steps", + userId: "user-1", + agentId: "agent-1", + agentName: "Test Agent", + operationId: "op-1", + stepIndex: 0, + type: "message", + role: "user", + content: "Hello", + }, + { + id: "step-2", + conversationId: "conv-steps", + userId: "user-1", + agentId: "agent-1", + agentName: "Test Agent", + operationId: "op-1", + stepIndex: 1, + type: "message", + role: "assistant", + content: "Hi there", + }, + ]; + + await adapter.saveConversationSteps(steps); + + const retrieved = await adapter.getConversationSteps("user-1", "conv-steps"); + + expect(retrieved).toHaveLength(2); + expect(retrieved[0].stepIndex).toBe(0); + expect(retrieved[1].stepIndex).toBe(1); + }); + + it("should filter steps by operationId", async () => { + await adapter.createConversation({ + id: "conv-steps-filter", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.saveConversationSteps([ + { + id: "step-op1", + conversationId: "conv-steps-filter", + userId: "user-1", + agentId: "agent-1", + operationId: "op-1", + stepIndex: 0, + type: "message", + role: "user", + } as any, + { + id: "step-op2", + conversationId: "conv-steps-filter", + userId: "user-1", + agentId: "agent-1", + operationId: "op-2", + stepIndex: 1, + type: "message", + role: "user", + } as any, + ]); + + const op1Steps = await adapter.getConversationSteps("user-1", "conv-steps-filter", { + operationId: "op-1", + }); + + expect(op1Steps).toHaveLength(1); + expect(op1Steps[0].operationId).toBe("op-1"); + }); + }); + + // ============================================================================ + // Edge Cases and Error Handling + // ============================================================================ + + describe("Edge Cases", () => { + it("should handle empty message arrays", async () => { + await adapter.createConversation({ + id: "conv-empty", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessages([], "user-1", "conv-empty"); + const messages = await adapter.getMessages("user-1", "conv-empty"); + + expect(messages).toHaveLength(0); + }); + + it("should return empty array for non-existent conversation messages", async () => { + const messages = await adapter.getMessages("user-1", "non-existent"); + expect(messages).toHaveLength(0); + }); + + it("should handle messages without IDs", async () => { + await adapter.createConversation({ + id: "conv-no-id", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const message: UIMessage = { + role: "user", + parts: [{ type: "text", text: "Hello" }], + id: "", + }; + + await adapter.addMessage(message, "user-1", "conv-no-id"); + + const messages = await adapter.getMessages("user-1", "conv-no-id"); + + expect(messages).toHaveLength(1); + expect(messages[0].id).toBeTruthy(); + }); + + it("should handle pagination beyond available results", async () => { + await adapter.createConversation({ + id: "conv-pagination", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const conversations = await adapter.queryConversations({ + userId: "user-1", + limit: 10, + offset: 100, + }); + + expect(conversations).toHaveLength(0); + }); + }); +}); diff --git a/packages/mongodb/src/index.ts b/packages/mongodb/src/index.ts new file mode 100644 index 000000000..cfebb019d --- /dev/null +++ b/packages/mongodb/src/index.ts @@ -0,0 +1,2 @@ +export { MongoDBMemoryAdapter } from "./memory-adapter"; +export type { MongoDBMemoryOptions } from "./memory-adapter"; diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts new file mode 100644 index 000000000..c87fee035 --- /dev/null +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -0,0 +1,976 @@ +/** + * MongoDB Storage Adapter for Memory + * Stores conversations and messages in MongoDB database + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { + Conversation, + ConversationQueryOptions, + ConversationStepRecord, + CreateConversationInput, + GetConversationStepsOptions, + GetMessagesOptions, + StorageAdapter, + WorkflowRunQuery, + WorkflowStateEntry, + WorkingMemoryScope, +} from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { type Collection, type Db, type Document, MongoClient } from "mongodb"; + +/** + * MongoDB configuration options for Memory + */ +export interface MongoDBMemoryOptions { + /** + * MongoDB connection URI + * Examples: + * - "mongodb://localhost:27017" + * - "mongodb://username:password@localhost:27017" + * - "mongodb+srv://username:password@cluster.mongodb.net" + */ + connection: string; + + /** + * Database name to use for collections + * @default "voltagent" + */ + database?: string; + + /** + * Prefix for collection names + * @default "voltagent_memory" + */ + collectionPrefix?: string; + + /** + * Whether to enable debug logging + * @default false + */ + debug?: boolean; +} + +/** + * MongoDB Storage Adapter for Memory + * Production-ready storage for conversations and messages + */ +export class MongoDBMemoryAdapter implements StorageAdapter { + private client: MongoClient; + private db: Db | null = null; + private databaseName: string; + private collectionPrefix: string; + private initialized = false; + private initPromise: Promise | null = null; + private debug: boolean; + + constructor(options: MongoDBMemoryOptions) { + this.databaseName = options.database ?? "voltagent"; + this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; + this.debug = options.debug ?? false; + + // Validate collection prefix + if ( + this.collectionPrefix.includes("\0") || + this.collectionPrefix.includes("$") || + this.collectionPrefix.startsWith("system.") + ) { + throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); + } + + // Create MongoDB client + this.client = new MongoClient(options.connection); + + this.log("MongoDB Memory adapter initialized"); + + // Start initialization but don't await it + this.initPromise = this.initialize(); + } + + /** + * Log debug messages + */ + private log(...args: any[]): void { + if (this.debug) { + console.log("[MongoDB Memory]", ...args); + } + } + + /** + * Generate a random ID + */ + private generateId(): string { + return ( + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + ); + } + + /** + * Get collection by name + */ + private getCollection(collectionName: string): Collection { + if (!this.db) { + throw new Error("Database not initialized"); + } + return this.db.collection(`${this.collectionPrefix}_${collectionName}`); + } + + /** + * Initialize database schema + */ + private async initialize(): Promise { + if (this.initialized) return; + + // Prevent multiple simultaneous initializations + if (this.initPromise && !this.initialized) { + return this.initPromise; + } + + try { + // Connect to MongoDB + await this.client.connect(); + this.db = this.client.db(this.databaseName); + + this.log(`Connected to MongoDB database: ${this.databaseName}`); + + // Create indexes for all collections + const conversationsCollection = this.getCollection("conversations"); + const messagesCollection = this.getCollection("messages"); + const workflowStatesCollection = this.getCollection("workflow_states"); + const stepsCollection = this.getCollection("steps"); + + // Users collection indexes (none needed beyond _id) + + // Conversations collection indexes + await conversationsCollection.createIndex({ userId: 1 }, { background: true }); + await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); + await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); + + // Messages collection indexes + await messagesCollection.createIndex( + { conversationId: 1, createdAt: 1 }, + { background: true }, + ); + await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); + // Unique compound index to enforce message uniqueness + await messagesCollection.createIndex( + { conversationId: 1, messageId: 1 }, + { unique: true, background: true }, + ); + + // Workflow states collection indexes + await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); + + // Steps collection indexes + await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); + await stepsCollection.createIndex( + { conversationId: 1, operationId: 1 }, + { background: true }, + ); + + this.initialized = true; + this.log("Database schema initialized with indexes"); + } catch (error) { + throw new Error( + `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + /** + * Close MongoDB connection + */ + async close(): Promise { + await this.client.close(); + this.log("MongoDB connection closed"); + } + + // ============================================================================ + // Message Operations + // ============================================================================ + + /** + * Add a single message + */ + async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const messageId = message.id || this.generateId(); + + try { + await messagesCollection.insertOne({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId, + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + } as any); + + this.log(`Added message ${messageId} to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error( + `Message with ID ${messageId} already exists in conversation ${conversationId}`, + ); + } + throw error; + } + } + + /** + * Add multiple messages + */ + async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { + await this.initPromise; + + if (messages.length === 0) return; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const documentsToInsert = messages.map((message) => ({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId: message.id || this.generateId(), + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + })); + + try { + await messagesCollection.insertMany(documentsToInsert as any); + this.log(`Added ${messages.length} messages to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error(`One or more messages already exist in conversation ${conversationId}`); + } + throw error; + } + } + + /** + * Get messages for a conversation + */ + async getMessages( + userId: string, + conversationId: string, + options?: GetMessagesOptions, + ): Promise[]> { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + const filter: any = { conversationId, userId }; + + if (options?.roles && options.roles.length > 0) { + filter.role = { $in: options.roles }; + } + + if (options?.after) { + filter.createdAt = { $gt: options.after }; + } + + if (options?.before) { + filter.createdAt = { ...filter.createdAt, $lt: options.before }; + } + + let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const messages = await cursor.toArray(); + + return messages.map((msg: any) => ({ + id: msg.messageId, + role: msg.role, + parts: msg.parts, + metadata: { + ...msg.metadata, + createdAt: msg.createdAt, + }, + })); + } + + /** + * Clear messages for a conversation or all conversations for a user + */ + async clearMessages(userId: string, conversationId?: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + if (conversationId) { + // Clear messages for specific conversation + await messagesCollection.deleteMany({ conversationId, userId }); + await stepsCollection.deleteMany({ conversationId, userId }); + this.log(`Cleared messages for conversation ${conversationId}`); + } else { + // Clear all messages for user + // First get all conversation IDs for this user + const conversationsCollection = this.getCollection("conversations"); + const userConversations = await conversationsCollection + .find({ userId }) + .project({ _id: 1 }) + .toArray(); + + const conversationIds = userConversations.map((conv: any) => conv._id); + + if (conversationIds.length > 0) { + await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); + await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); + this.log(`Cleared all messages for user ${userId}`); + } + } + } + + // ============================================================================ + // Conversation Operations + // ============================================================================ + + /** + * Create a new conversation + */ + async createConversation(input: CreateConversationInput): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // Check if conversation already exists + const existing = await conversationsCollection.findOne({ _id: input.id } as any); + if (existing) { + throw new ConversationAlreadyExistsError(input.id); + } + + const now = new Date(); + const conversation = { + _id: input.id, + resourceId: input.resourceId, + userId: input.userId, + title: input.title, + metadata: input.metadata || {}, + createdAt: now, + updatedAt: now, + }; + + await conversationsCollection.insertOne(conversation as any); + + this.log(`Created conversation ${input.id}`); + + return { + id: conversation._id, + resourceId: conversation.resourceId, + userId: conversation.userId, + title: conversation.title, + metadata: conversation.metadata, + createdAt: conversation.createdAt.toISOString(), + updatedAt: conversation.updatedAt.toISOString(), + }; + } + + /** + * Get a conversation by ID + */ + async getConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ _id: id } as any); + + if (!conversation) { + return null; + } + + return { + id: (conversation as any)._id, + resourceId: (conversation as any).resourceId, + userId: (conversation as any).userId, + title: (conversation as any).title, + metadata: (conversation as any).metadata || {}, + createdAt: (conversation as any).createdAt.toISOString(), + updatedAt: (conversation as any).updatedAt.toISOString(), + }; + } + + /** + * Get all conversations for a resource + */ + async getConversations(resourceId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversations = await conversationsCollection + .find({ resourceId } as any) + .sort({ updatedAt: -1 }) + .toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Get all conversations for a user + */ + async getConversationsByUserId( + userId: string, + options?: Omit, + ): Promise { + return this.queryConversations({ ...options, userId }); + } + + /** + * Query conversations with filters + */ + async queryConversations(options: ConversationQueryOptions): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); + + if (options.limit) { + cursor = cursor.limit(options.limit); + } + + if (options.offset) { + cursor = cursor.skip(options.offset); + } + + const conversations = await cursor.toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Update a conversation + */ + async updateConversation( + id: string, + updates: Partial>, + ): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.title !== undefined) { + updateDoc.title = updates.title; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + if (updates.resourceId !== undefined) { + updateDoc.resourceId = updates.resourceId; + } + + if (updates.userId !== undefined) { + updateDoc.userId = updates.userId; + } + + const result = await conversationsCollection.findOneAndUpdate( + { _id: id } as any, + { $set: updateDoc }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new ConversationNotFoundError(id); + } + + this.log(`Updated conversation ${id}`); + + return { + id: (result as any)._id, + resourceId: (result as any).resourceId, + userId: (result as any).userId, + title: (result as any).title, + metadata: (result as any).metadata || {}, + createdAt: (result as any).createdAt.toISOString(), + updatedAt: (result as any).updatedAt.toISOString(), + }; + } + + /** + * Delete a conversation + */ + async deleteConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // MongoDB will cascade delete messages and steps automatically via application logic + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId: id } as any); + await stepsCollection.deleteMany({ conversationId: id } as any); + await conversationsCollection.deleteOne({ _id: id } as any); + + this.log(`Deleted conversation ${id}`); + } + + // ============================================================================ + // Conversation Steps Operations + // ============================================================================ + + /** + * Save conversation steps + */ + async saveConversationSteps(steps: ConversationStepRecord[]): Promise { + await this.initPromise; + + if (steps.length === 0) return; + + const stepsCollection = this.getCollection("steps"); + + const operations = steps.map((step) => ({ + replaceOne: { + filter: { _id: step.id || this.generateId() }, + replacement: { + _id: step.id || this.generateId(), + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, + }, + })); + + await stepsCollection.bulkWrite(operations as any); + + this.log(`Saved ${steps.length} conversation steps`); + } + + /** + * Get conversation steps + */ + async getConversationSteps( + userId: string, + conversationId: string, + options?: GetConversationStepsOptions, + ): Promise { + await this.initPromise; + + const stepsCollection = this.getCollection("steps"); + + const filter: any = { conversationId, userId }; + + if (options?.operationId) { + filter.operationId = options.operationId; + } + + let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const steps = await cursor.toArray(); + + return steps.map((step: any) => ({ + id: step._id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: step.createdAt.toISOString(), + })); + } + + // ============================================================================ + // Working Memory Operations + // ============================================================================ + + /** + * Get working memory + */ + async getWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + + if (!conversation) { + return null; + } + + const workingMemory = (conversation as any).metadata?.workingMemory; + return workingMemory || null; + } + + if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + const user = await usersCollection.findOne({ _id: params.userId } as any); + + if (!user) { + return null; + } + + const workingMemory = (user as any).metadata?.workingMemory; + return workingMemory || null; + } + + return null; + } + + /** + * Set working memory + */ + async setWorkingMemory(params: { + conversationId?: string; + userId?: string; + content: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + if (!conversation) { + throw new ConversationNotFoundError(params.conversationId); + } + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + }); + + this.log(`Set working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + // Upsert user document with working memory + await usersCollection.updateOne( + { _id: params.userId } as any, + { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + $setOnInsert: { + createdAt: new Date(), + }, + }, + { upsert: true }, + ); + + this.log(`Set working memory for user ${params.userId}`); + } + } + + /** + * Delete working memory + */ + async deleteWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + await usersCollection.updateOne({ _id: params.userId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for user ${params.userId}`); + } + } + + // ============================================================================ + // Workflow State Operations + // ============================================================================ + + /** + * Get workflow state by execution ID + */ + async getWorkflowState(executionId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + const state = await workflowStatesCollection.findOne({ _id: executionId } as any); + + if (!state) { + return null; + } + + return { + id: (state as any)._id, + workflowId: (state as any).workflowId, + workflowName: (state as any).workflowName, + status: (state as any).status, + suspension: (state as any).suspension, + events: (state as any).events, + output: (state as any).output, + cancellation: (state as any).cancellation, + userId: (state as any).userId, + conversationId: (state as any).conversationId, + metadata: (state as any).metadata, + createdAt: (state as any).createdAt.toISOString(), + updatedAt: (state as any).updatedAt.toISOString(), + }; + } + + /** + * Query workflow runs with filters + */ + async queryWorkflowRuns(query: WorkflowRunQuery): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const filter: any = {}; + + if (query.workflowId) { + filter.workflowId = query.workflowId; + } + + if (query.status) { + filter.status = query.status; + } + + if (query.from) { + filter.createdAt = { $gte: query.from }; + } + + if (query.to) { + filter.createdAt = { ...filter.createdAt, $lte: query.to }; + } + + let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); + + if (query.limit) { + cursor = cursor.limit(query.limit); + } + + if (query.offset) { + cursor = cursor.skip(query.offset); + } + + const states = await cursor.toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt.toISOString(), + updatedAt: state.updatedAt.toISOString(), + })); + } + + /** + * Set workflow state (create or replace) + */ + async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const now = new Date(); + + await workflowStatesCollection.replaceOne( + { _id: executionId } as any, + { + _id: executionId, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt ? new Date(state.createdAt) : now, + updatedAt: now, + } as any, + { upsert: true }, + ); + + this.log(`Set workflow state ${executionId}`); + } + + /** + * Update workflow state (partial update) + */ + async updateWorkflowState( + executionId: string, + updates: Partial, + ): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.status !== undefined) { + updateDoc.status = updates.status; + } + + if (updates.suspension !== undefined) { + updateDoc.suspension = updates.suspension; + } + + if (updates.events !== undefined) { + updateDoc.events = updates.events; + } + + if (updates.output !== undefined) { + updateDoc.output = updates.output; + } + + if (updates.cancellation !== undefined) { + updateDoc.cancellation = updates.cancellation; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); + + this.log(`Updated workflow state ${executionId}`); + } + + /** + * Get suspended workflow states + */ + async getSuspendedWorkflowStates(workflowId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const states = await workflowStatesCollection + .find({ workflowId, status: "suspended" } as any) + .sort({ createdAt: -1 }) + .toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt, + updatedAt: state.updatedAt, + })); + } +} diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts new file mode 100644 index 000000000..b840d5559 --- /dev/null +++ b/packages/mongodb/src/memory-adapter.ts @@ -0,0 +1,977 @@ +/** + * MongoDB Storage Adapter for Memory + * Stores conversations and messages in MongoDB database + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { + Conversation, + ConversationQueryOptions, + ConversationStepRecord, + CreateConversationInput, + GetConversationStepsOptions, + GetMessagesOptions, + StorageAdapter, + WorkflowRunQuery, + WorkflowStateEntry, + WorkingMemoryScope, +} from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { type Collection, type Db, type Document, MongoClient } from "mongodb"; + +/** + * MongoDB configuration options for Memory + */ +export interface MongoDBMemoryOptions { + /** + * MongoDB connection URI + * Examples: + * - "mongodb://localhost:27017" + * - "mongodb://username:password@localhost:27017" + * - "mongodb+srv://username:password@cluster.mongodb.net" + */ + connection: string; + + /** + * Database name to use for collections + * @default "voltagent" + */ + database?: string; + + /** + * Prefix for collection names + * @default "voltagent_memory" + */ + collectionPrefix?: string; + + /** + * Whether to enable debug logging + * @default false + */ + debug?: boolean; +} + +/** + * MongoDB Storage Adapter for Memory + * Production-ready storage for conversations and messages + */ +export class MongoDBMemoryAdapter implements StorageAdapter { + private client: MongoClient; + private db: Db | null = null; + private databaseName: string; + private collectionPrefix: string; + private initialized = false; + private initPromise: Promise | null = null; + private debug: boolean; + + constructor(options: MongoDBMemoryOptions) { + this.databaseName = options.database ?? "voltagent"; + this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; + this.debug = options.debug ?? false; + + // Validate collection prefix + if ( + this.collectionPrefix.includes("\0") || + this.collectionPrefix.includes("$") || + this.collectionPrefix.startsWith("system.") + ) { + throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); + } + + // Create MongoDB client + this.client = new MongoClient(options.connection); + + this.log("MongoDB Memory adapter initialized"); + + // Start initialization but don't await it + this.initPromise = this.initialize(); + } + + /** + * Log debug messages + */ + private log(...args: any[]): void { + if (this.debug) { + console.log("[MongoDB Memory]", ...args); + } + } + + /** + * Generate a random ID + */ + private generateId(): string { + return ( + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + ); + } + + /** + * Get collection by name + */ + private getCollection(collectionName: string): Collection { + if (!this.db) { + throw new Error("Database not initialized"); + } + return this.db.collection(`${this.collectionPrefix}_${collectionName}`); + } + + /** + * Initialize database schema + */ + private async initialize(): Promise { + if (this.initialized) return; + + // Prevent multiple simultaneous initializations + if (this.initPromise && !this.initialized) { + return this.initPromise; + } + + try { + // Connect to MongoDB + await this.client.connect(); + this.db = this.client.db(this.databaseName); + + this.log(`Connected to MongoDB database: ${this.databaseName}`); + + // Create indexes for all collections + + const conversationsCollection = this.getCollection("conversations"); + const messagesCollection = this.getCollection("messages"); + const workflowStatesCollection = this.getCollection("workflow_states"); + const stepsCollection = this.getCollection("steps"); + + // Users collection indexes (none needed beyond _id) + + // Conversations collection indexes + await conversationsCollection.createIndex({ userId: 1 }, { background: true }); + await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); + await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); + + // Messages collection indexes + await messagesCollection.createIndex( + { conversationId: 1, createdAt: 1 }, + { background: true }, + ); + await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); + // Unique compound index to enforce message uniqueness + await messagesCollection.createIndex( + { conversationId: 1, messageId: 1 }, + { unique: true, background: true }, + ); + + // Workflow states collection indexes + await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); + + // Steps collection indexes + await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); + await stepsCollection.createIndex( + { conversationId: 1, operationId: 1 }, + { background: true }, + ); + + this.initialized = true; + this.log("Database schema initialized with indexes"); + } catch (error) { + throw new Error( + `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + /** + * Close MongoDB connection + */ + async close(): Promise { + await this.client.close(); + this.log("MongoDB connection closed"); + } + + // ============================================================================ + // Message Operations + // ============================================================================ + + /** + * Add a single message + */ + async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const messageId = message.id || this.generateId(); + + try { + await messagesCollection.insertOne({ + _id: undefined, + conversationId, + messageId, + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + } as any); + + this.log(`Added message ${messageId} to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error( + `Message with ID ${messageId} already exists in conversation ${conversationId}`, + ); + } + throw error; + } + } + + /** + * Add multiple messages + */ + async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { + await this.initPromise; + + if (messages.length === 0) return; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const documentsToInsert = messages.map((message) => ({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId: message.id || this.generateId(), + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + })); + + try { + await messagesCollection.insertMany(documentsToInsert as any); + this.log(`Added ${messages.length} messages to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error(`One or more messages already exist in conversation ${conversationId}`); + } + throw error; + } + } + + /** + * Get messages for a conversation + */ + async getMessages( + userId: string, + conversationId: string, + options?: GetMessagesOptions, + ): Promise[]> { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + const filter: any = { conversationId, userId }; + + if (options?.roles && options.roles.length > 0) { + filter.role = { $in: options.roles }; + } + + if (options?.after) { + filter.createdAt = { $gt: options.after }; + } + + if (options?.before) { + filter.createdAt = { ...filter.createdAt, $lt: options.before }; + } + + let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const messages = await cursor.toArray(); + + return messages.map((msg: any) => ({ + id: msg.messageId, + role: msg.role, + parts: msg.parts, + metadata: { + ...msg.metadata, + createdAt: msg.createdAt, + }, + })); + } + + /** + * Clear messages for a conversation or all conversations for a user + */ + async clearMessages(userId: string, conversationId?: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + if (conversationId) { + // Clear messages for specific conversation + await messagesCollection.deleteMany({ conversationId, userId }); + await stepsCollection.deleteMany({ conversationId, userId }); + this.log(`Cleared messages for conversation ${conversationId}`); + } else { + // Clear all messages for user + // First get all conversation IDs for this user + const conversationsCollection = this.getCollection("conversations"); + const userConversations = await conversationsCollection + .find({ userId }) + .project({ _id: 1 }) + .toArray(); + + const conversationIds = userConversations.map((conv: any) => conv._id); + + if (conversationIds.length > 0) { + await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); + await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); + this.log(`Cleared all messages for user ${userId}`); + } + } + } + + // ============================================================================ + // Conversation Operations + // ============================================================================ + + /** + * Create a new conversation + */ + async createConversation(input: CreateConversationInput): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // Check if conversation already exists + const existing = await conversationsCollection.findOne({ _id: input.id } as any); + if (existing) { + throw new ConversationAlreadyExistsError(input.id); + } + + const now = new Date(); + const conversation = { + _id: input.id, + resourceId: input.resourceId, + userId: input.userId, + title: input.title, + metadata: input.metadata || {}, + createdAt: now, + updatedAt: now, + }; + + await conversationsCollection.insertOne(conversation as any); + + this.log(`Created conversation ${input.id}`); + + return { + id: conversation._id, + resourceId: conversation.resourceId, + userId: conversation.userId, + title: conversation.title, + metadata: conversation.metadata, + createdAt: conversation.createdAt.toISOString(), + updatedAt: conversation.updatedAt.toISOString(), + }; + } + + /** + * Get a conversation by ID + */ + async getConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ _id: id } as any); + + if (!conversation) { + return null; + } + + return { + id: (conversation as any)._id, + resourceId: (conversation as any).resourceId, + userId: (conversation as any).userId, + title: (conversation as any).title, + metadata: (conversation as any).metadata || {}, + createdAt: (conversation as any).createdAt.toISOString(), + updatedAt: (conversation as any).updatedAt.toISOString(), + }; + } + + /** + * Get all conversations for a resource + */ + async getConversations(resourceId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversations = await conversationsCollection + .find({ resourceId } as any) + .sort({ updatedAt: -1 }) + .toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Get all conversations for a user + */ + async getConversationsByUserId( + userId: string, + options?: Omit, + ): Promise { + return this.queryConversations({ ...options, userId }); + } + + /** + * Query conversations with filters + */ + async queryConversations(options: ConversationQueryOptions): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); + + if (options.limit) { + cursor = cursor.limit(options.limit); + } + + if (options.offset) { + cursor = cursor.skip(options.offset); + } + + const conversations = await cursor.toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Update a conversation + */ + async updateConversation( + id: string, + updates: Partial>, + ): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.title !== undefined) { + updateDoc.title = updates.title; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + if (updates.resourceId !== undefined) { + updateDoc.resourceId = updates.resourceId; + } + + if (updates.userId !== undefined) { + updateDoc.userId = updates.userId; + } + + const result = await conversationsCollection.findOneAndUpdate( + { _id: id } as any, + { $set: updateDoc }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new ConversationNotFoundError(id); + } + + this.log(`Updated conversation ${id}`); + + return { + id: (result as any)._id, + resourceId: (result as any).resourceId, + userId: (result as any).userId, + title: (result as any).title, + metadata: (result as any).metadata || {}, + createdAt: (result as any).createdAt.toISOString(), + updatedAt: (result as any).updatedAt.toISOString(), + }; + } + + /** + * Delete a conversation + */ + async deleteConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // MongoDB will cascade delete messages and steps automatically via application logic + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId: id } as any); + await stepsCollection.deleteMany({ conversationId: id } as any); + await conversationsCollection.deleteOne({ _id: id } as any); + + this.log(`Deleted conversation ${id}`); + } + + // ============================================================================ + // Conversation Steps Operations + // ============================================================================ + + /** + * Save conversation steps + */ + async saveConversationSteps(steps: ConversationStepRecord[]): Promise { + await this.initPromise; + + if (steps.length === 0) return; + + const stepsCollection = this.getCollection("steps"); + + const operations = steps.map((step) => ({ + replaceOne: { + filter: { _id: step.id || this.generateId() }, + replacement: { + _id: step.id || this.generateId(), + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, + }, + })); + + await stepsCollection.bulkWrite(operations as any); + + this.log(`Saved ${steps.length} conversation steps`); + } + + /** + * Get conversation steps + */ + async getConversationSteps( + userId: string, + conversationId: string, + options?: GetConversationStepsOptions, + ): Promise { + await this.initPromise; + + const stepsCollection = this.getCollection("steps"); + + const filter: any = { conversationId, userId }; + + if (options?.operationId) { + filter.operationId = options.operationId; + } + + let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const steps = await cursor.toArray(); + + return steps.map((step: any) => ({ + id: step._id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: step.createdAt.toISOString(), + })); + } + + // ============================================================================ + // Working Memory Operations + // ============================================================================ + + /** + * Get working memory + */ + async getWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + + if (!conversation) { + return null; + } + + const workingMemory = (conversation as any).metadata?.workingMemory; + return workingMemory || null; + } + + if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + const user = await usersCollection.findOne({ _id: params.userId } as any); + + if (!user) { + return null; + } + + const workingMemory = (user as any).metadata?.workingMemory; + return workingMemory || null; + } + + return null; + } + + /** + * Set working memory + */ + async setWorkingMemory(params: { + conversationId?: string; + userId?: string; + content: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + if (!conversation) { + throw new ConversationNotFoundError(params.conversationId); + } + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + }); + + this.log(`Set working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + // Upsert user document with working memory + await usersCollection.updateOne( + { _id: params.userId } as any, + { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + $setOnInsert: { + createdAt: new Date(), + }, + }, + { upsert: true }, + ); + + this.log(`Set working memory for user ${params.userId}`); + } + } + + /** + * Delete working memory + */ + async deleteWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + await usersCollection.updateOne({ _id: params.userId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for user ${params.userId}`); + } + } + + // ============================================================================ + // Workflow State Operations + // ============================================================================ + + /** + * Get workflow state by execution ID + */ + async getWorkflowState(executionId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + const state = await workflowStatesCollection.findOne({ _id: executionId } as any); + + if (!state) { + return null; + } + + return { + id: (state as any)._id, + workflowId: (state as any).workflowId, + workflowName: (state as any).workflowName, + status: (state as any).status, + suspension: (state as any).suspension, + events: (state as any).events, + output: (state as any).output, + cancellation: (state as any).cancellation, + userId: (state as any).userId, + conversationId: (state as any).conversationId, + metadata: (state as any).metadata, + createdAt: (state as any).createdAt.toISOString(), + updatedAt: (state as any).updatedAt.toISOString(), + }; + } + + /** + * Query workflow runs with filters + */ + async queryWorkflowRuns(query: WorkflowRunQuery): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const filter: any = {}; + + if (query.workflowId) { + filter.workflowId = query.workflowId; + } + + if (query.status) { + filter.status = query.status; + } + + if (query.from) { + filter.createdAt = { $gte: query.from }; + } + + if (query.to) { + filter.createdAt = { ...filter.createdAt, $lte: query.to }; + } + + let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); + + if (query.limit) { + cursor = cursor.limit(query.limit); + } + + if (query.offset) { + cursor = cursor.skip(query.offset); + } + + const states = await cursor.toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt.toISOString(), + updatedAt: state.updatedAt.toISOString(), + })); + } + + /** + * Set workflow state (create or replace) + */ + async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const now = new Date(); + + await workflowStatesCollection.replaceOne( + { _id: executionId } as any, + { + _id: executionId, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt ? new Date(state.createdAt) : now, + updatedAt: now, + } as any, + { upsert: true }, + ); + + this.log(`Set workflow state ${executionId}`); + } + + /** + * Update workflow state (partial update) + */ + async updateWorkflowState( + executionId: string, + updates: Partial, + ): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.status !== undefined) { + updateDoc.status = updates.status; + } + + if (updates.suspension !== undefined) { + updateDoc.suspension = updates.suspension; + } + + if (updates.events !== undefined) { + updateDoc.events = updates.events; + } + + if (updates.output !== undefined) { + updateDoc.output = updates.output; + } + + if (updates.cancellation !== undefined) { + updateDoc.cancellation = updates.cancellation; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); + + this.log(`Updated workflow state ${executionId}`); + } + + /** + * Get suspended workflow states + */ + async getSuspendedWorkflowStates(workflowId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const states = await workflowStatesCollection + .find({ workflowId, status: "suspended" } as any) + .sort({ createdAt: -1 }) + .toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt, + updatedAt: state.updatedAt, + })); + } +} diff --git a/packages/mongodb/tsconfig.json b/packages/mongodb/tsconfig.json new file mode 100644 index 000000000..fb0c6c836 --- /dev/null +++ b/packages/mongodb/tsconfig.json @@ -0,0 +1,30 @@ +{ + "compilerOptions": { + "target": "es2018", + "lib": ["dom", "dom.iterable", "esnext"], + "module": "esnext", + "moduleResolution": "node", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "rootDir": "./", + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "strictFunctionTypes": true, + "strictBindCallApply": true, + "strictPropertyInitialization": true, + "noImplicitThis": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "types": ["node", "vitest/globals"] + }, + "include": ["src/**/*.ts", "__tests__/**/*.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/mongodb/tsup.config.ts b/packages/mongodb/tsup.config.ts new file mode 100644 index 000000000..0819104fd --- /dev/null +++ b/packages/mongodb/tsup.config.ts @@ -0,0 +1,19 @@ +import { defineConfig } from "tsup"; +import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; + +export default defineConfig({ + entry: ["src/index.ts"], + format: ["cjs", "esm"], + splitting: false, + sourcemap: true, + clean: false, + target: "es2022", + outDir: "dist", + minify: false, + dts: true, + esbuildPlugins: [markAsExternalPlugin], + esbuildOptions(options) { + options.keepNames = true; + return options; + }, +}); diff --git a/packages/mongodb/vitest.config.mts b/packages/mongodb/vitest.config.mts new file mode 100644 index 000000000..cc2bb3aa7 --- /dev/null +++ b/packages/mongodb/vitest.config.mts @@ -0,0 +1,10 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + include: ["src/**/*.{test,spec}.ts"], + exclude: ["src/**/*.integration.test.ts", "node_modules", "dist"], + }, +}); diff --git a/packages/mongodb/vitest.integration.config.mts b/packages/mongodb/vitest.integration.config.mts new file mode 100644 index 000000000..fc5c2ab76 --- /dev/null +++ b/packages/mongodb/vitest.integration.config.mts @@ -0,0 +1,10 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + include: ["src/**/*.integration.test.ts"], + testTimeout: 30000, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9c77b482e..5f62cd597 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4004,6 +4004,22 @@ importers: specifier: ^3.25.76 version: 3.25.76 + packages/mongodb: + dependencies: + mongodb: + specifier: ^7.0.0 + version: 7.0.0 + devDependencies: + '@vitest/coverage-v8': + specifier: ^3.2.4 + version: 3.2.4(vitest@3.2.4) + '@voltagent/core': + specifier: ^2.0.2 + version: link:../core + ai: + specifier: ^6.0.0 + version: 6.0.3(zod@4.2.1) + packages/postgres: dependencies: '@voltagent/internal': @@ -10979,6 +10995,12 @@ packages: os-filter-obj: 2.0.0 dev: true + /@mongodb-js/saslprep@1.4.4: + resolution: {integrity: sha512-p7X/ytJDIdwUfFL/CLOhKgdfJe1Fa8uw9seJYvdOmnP9JBWGWHW69HkOixXS6Wy9yvGf1MbhcS6lVmrhy4jm2g==} + dependencies: + sparse-bitfield: 3.0.3 + dev: false + /@mswjs/interceptors@0.40.0: resolution: {integrity: sha512-EFd6cVbHsgLa6wa4RljGj6Wk75qoHxUSyc5asLyyPSyuhIcdS2Q3Phw6ImS1q+CkALthJRShiYfKANcQMuMqsQ==} engines: {node: '>=18'} @@ -19120,6 +19142,16 @@ packages: resolution: {integrity: sha512-oIQLCGWtcFZy2JW77j9k8nHzAOpqMHLQejDA48XXMWH6tjCQHz5RCFz1bzsmROyL6PUm+LLnUiI4BCn221inxA==} dev: false + /@types/webidl-conversions@7.0.3: + resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + dev: false + + /@types/whatwg-url@13.0.0: + resolution: {integrity: sha512-N8WXpbE6Wgri7KUSvrmQcqrMllKZ9uxkYWMt+mCSGwNc0Hsw9VQTW7ApqI4XNrx6/SaM2QQJCzMPDEXE058s+Q==} + dependencies: + '@types/webidl-conversions': 7.0.3 + dev: false + /@types/wrap-ansi@3.0.0: resolution: {integrity: sha512-ltIpx+kM7g/MLRZfkbL7EsCEjfzCcScLpkg37eXEtx5kmrAKBkTJwd1GIAjDSL8wTpM6Hzn5YO4pSb91BEwu1g==} dev: false @@ -21781,6 +21813,11 @@ packages: node-int64: 0.4.0 dev: true + /bson@7.0.0: + resolution: {integrity: sha512-Kwc6Wh4lQ5OmkqqKhYGKIuELXl+EPYSCObVE6bWsp1T/cGkOCBN0I8wF/T44BiuhHyNi1mmKVPXk60d41xZ7kw==} + engines: {node: '>=20.19.0'} + dev: false + /buffer-crc32@0.2.13: resolution: {integrity: sha512-VO9Ht/+p3SN7SKWqcrgEzjGbRSJYTx+Q1pTQC0wrWqHx0vpJraQ6GtHx8tvcg1rlK1byhU5gccxgOgj7B0TDkQ==} dev: true @@ -30390,6 +30427,10 @@ packages: resolution: {integrity: sha512-rkpe71W0N0c0Xz6QD0eJETuWAJGnJ9afsl1srmwPrI+yBCkge5EycXXbYRyvL29zZVUWQCY7InPRCv3GDXuZNw==} dev: true + /memory-pager@1.5.0: + resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + dev: false + /meow@12.1.1: resolution: {integrity: sha512-BhXM0Au22RwUneMPwSCnyhTOizdWoIEPU9sp0Aqa1PnDMR5Wv2FGXYDjuzJEIX+Eo2Rb8xuYe5jrnm5QowQFkw==} engines: {node: '>=16.10'} @@ -31223,6 +31264,46 @@ packages: micro-memoize: 4.1.3 dev: true + /mongodb-connection-string-url@7.0.0: + resolution: {integrity: sha512-irhhjRVLE20hbkRl4zpAYLnDMM+zIZnp0IDB9akAFFUZp/3XdOfwwddc7y6cNvF2WCEtfTYRwYbIfYa2kVY0og==} + engines: {node: '>=20.19.0'} + dependencies: + '@types/whatwg-url': 13.0.0 + whatwg-url: 14.2.0 + dev: false + + /mongodb@7.0.0: + resolution: {integrity: sha512-vG/A5cQrvGGvZm2mTnCSz1LUcbOPl83hfB6bxULKQ8oFZauyox/2xbZOoGNl+64m8VBrETkdGCDBdOsCr3F3jg==} + engines: {node: '>=20.19.0'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.806.0 + '@mongodb-js/zstd': ^7.0.0 + gcp-metadata: ^7.0.1 + kerberos: ^7.0.0 + mongodb-client-encryption: '>=7.0.0 <7.1.0' + snappy: ^7.3.2 + socks: ^2.8.6 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true + dependencies: + '@mongodb-js/saslprep': 1.4.4 + bson: 7.0.0 + mongodb-connection-string-url: 7.0.0 + dev: false + /motion-dom@12.23.12: resolution: {integrity: sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==} dependencies: @@ -36266,6 +36347,12 @@ packages: /space-separated-tokens@2.0.2: resolution: {integrity: sha512-PEGlAwrG8yXGXRjW32fGbg66JAlOAwbObuqVoJpv/mRgoWDQfgH1wDPvtzWyUSNAXBGSk8h755YDbbcEy3SH2Q==} + /sparse-bitfield@3.0.3: + resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + dependencies: + memory-pager: 1.5.0 + dev: false + /spawndamnit@3.0.1: resolution: {integrity: sha512-MmnduQUuHCoFckZoWnXsTg7JaiLBJrKFj9UI2MbRPGaJeVpsLcVBu6P/IGZovziM/YBsellCmsprgNA+w0CzVg==} dependencies: @@ -37373,6 +37460,13 @@ packages: punycode: 2.3.1 dev: true + /tr46@5.1.1: + resolution: {integrity: sha512-hdF5ZgjTqgAntKkklYw0R03MG2x/bSzTtkxmIRw/sTNV8YXsCJ1tfLAX23lhxhHJlEf3CRCOCGGWw3vI3GaSPw==} + engines: {node: '>=18'} + dependencies: + punycode: 2.3.1 + dev: false + /tree-kill@1.2.2: resolution: {integrity: sha512-L0Orpi8qGpRG//Nd+H90vFB+3iHnue1zSSGmNOOCh1GLJ7rUKVwV2HvijphGQS2UmhUZewS9VgvxYIdgr+fG1A==} hasBin: true @@ -39529,7 +39623,6 @@ packages: /webidl-conversions@7.0.0: resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} engines: {node: '>=12'} - dev: true /webpack-node-externals@3.0.0: resolution: {integrity: sha512-LnL6Z3GGDPht/AigwRh2dvL9PQPFQ8skEpVrWZXLWBYmqcaojHNN0onvHzie6rq7EWKrrBfPYqNEzTJgiwEQDQ==} @@ -39621,6 +39714,14 @@ packages: webidl-conversions: 7.0.0 dev: true + /whatwg-url@14.2.0: + resolution: {integrity: sha512-De72GdQZzNTUBBChsXueQUnPKDkg/5A5zp7pFDuQAj5UFoENpiACU0wlCvzpAGnTkj++ihpKwKyYewn/XNUbKw==} + engines: {node: '>=18'} + dependencies: + tr46: 5.1.1 + webidl-conversions: 7.0.0 + dev: false + /whatwg-url@5.0.0: resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} dependencies: From d59ad0e5e0f0322139e30c21abb365ef159eb6b7 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:00:16 +0530 Subject: [PATCH 2/8] fix: fixed the build failure and the implemented code review comments --- packages/mongodb/README.md | 12 ++--- packages/mongodb/src/memory-adapter.spec.ts | 57 +++++++++++---------- packages/mongodb/src/memory-adapter.ts | 57 +++++++++++---------- pnpm-lock.yaml | 24 +++++++-- 4 files changed, 86 insertions(+), 64 deletions(-) diff --git a/packages/mongodb/README.md b/packages/mongodb/README.md index 14232061e..f3e5ce150 100644 --- a/packages/mongodb/README.md +++ b/packages/mongodb/README.md @@ -32,9 +32,9 @@ const memory = new Memory({ ## Configuration -| Option | Type | Default | Description | -| str | str | str | str | -| `connection` | `string` | required | MongoDB connection URI | -| `database` | `string` | `"voltagent"` | Database name | -| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | -| `debug` | `boolean` | `false` | Enable debug logging | +| Option | Type | Default | Description | +| ------------------ | --------- | -------------------- | ---------------------- | +| `connection` | `string` | required | MongoDB connection URI | +| `database` | `string` | `"voltagent"` | Database name | +| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | +| `debug` | `boolean` | `false` | Enable debug logging | diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index c87fee035..96dac79f4 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -577,30 +577,33 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const stepsCollection = this.getCollection("steps"); - const operations = steps.map((step) => ({ - replaceOne: { - filter: { _id: step.id || this.generateId() }, - replacement: { - _id: step.id || this.generateId(), - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + const operations = steps.map((step) => { + const id = step.id || this.generateId(); + return { + replaceOne: { + filter: { _id: id }, + replacement: { + _id: id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, }, - upsert: true, - }, - })); + }; + }); await stepsCollection.bulkWrite(operations as any); @@ -807,8 +810,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: (state as any).userId, conversationId: (state as any).conversationId, metadata: (state as any).metadata, - createdAt: (state as any).createdAt.toISOString(), - updatedAt: (state as any).updatedAt.toISOString(), + createdAt: (state as any).createdAt, + updatedAt: (state as any).updatedAt, }; } @@ -862,8 +865,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: state.userId, conversationId: state.conversationId, metadata: state.metadata, - createdAt: state.createdAt.toISOString(), - updatedAt: state.updatedAt.toISOString(), + createdAt: state.createdAt, + updatedAt: state.updatedAt, })); } diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index b840d5559..942e6d4f0 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -578,30 +578,33 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const stepsCollection = this.getCollection("steps"); - const operations = steps.map((step) => ({ - replaceOne: { - filter: { _id: step.id || this.generateId() }, - replacement: { - _id: step.id || this.generateId(), - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + const operations = steps.map((step) => { + const id = step.id || this.generateId(); + return { + replaceOne: { + filter: { _id: id }, + replacement: { + _id: id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, }, - upsert: true, - }, - })); + }; + }); await stepsCollection.bulkWrite(operations as any); @@ -808,8 +811,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: (state as any).userId, conversationId: (state as any).conversationId, metadata: (state as any).metadata, - createdAt: (state as any).createdAt.toISOString(), - updatedAt: (state as any).updatedAt.toISOString(), + createdAt: (state as any).createdAt, + updatedAt: (state as any).updatedAt, }; } @@ -863,8 +866,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: state.userId, conversationId: state.conversationId, metadata: state.metadata, - createdAt: state.createdAt.toISOString(), - updatedAt: state.updatedAt.toISOString(), + createdAt: state.createdAt, + updatedAt: state.updatedAt, })); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5f62cd597..146c7ab9d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3005,7 +3005,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -3042,7 +3042,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -3085,7 +3085,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -4018,7 +4018,7 @@ importers: version: link:../core ai: specifier: ^6.0.0 - version: 6.0.3(zod@4.2.1) + version: 6.0.3(zod@4.3.5) packages/postgres: dependencies: @@ -20003,6 +20003,22 @@ packages: zod: 3.25.76 dev: false + /@voltagent/voice@2.1.0(@voltagent/core@packages+core)(zod@3.25.76): + resolution: {integrity: sha512-TByV1ci+aV4q6cgNImZCDsMAkEP6tRXhcEF+WrXHisldCNynNfp8WiJvRwNdRRPkLVaqqXmj20Mnj0j6Avr81w==} + peerDependencies: + '@voltagent/core': ^2.0.0 + dependencies: + '@voltagent/core': link:packages/core + '@xsai/generate-speech': 0.4.0-beta.1 + '@xsai/generate-transcription': 0.4.0-beta.1 + elevenlabs: 1.59.0 + openai: 4.104.0(ws@8.18.3)(zod@3.25.76) + transitivePeerDependencies: + - encoding + - ws + - zod + dev: false + /@vue-macros/common@3.0.0-beta.16(vue@3.5.22): resolution: {integrity: sha512-8O2gWxWFiaoNkk7PGi0+p7NPGe/f8xJ3/INUufvje/RZOs7sJvlI1jnR4lydtRFa/mU0ylMXUXXjSK0fHDEYTA==} engines: {node: '>=20.18.0'} From ef863b8e57a0f815577a96280e5e83cacd60e9fa Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:14:31 +0530 Subject: [PATCH 3/8] fix: handle race condition in createConversation --- packages/mongodb/src/memory-adapter.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 942e6d4f0..85541d414 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -361,12 +361,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const conversationsCollection = this.getCollection("conversations"); - // Check if conversation already exists - const existing = await conversationsCollection.findOne({ _id: input.id } as any); - if (existing) { - throw new ConversationAlreadyExistsError(input.id); - } - const now = new Date(); const conversation = { _id: input.id, @@ -378,7 +372,14 @@ export class MongoDBMemoryAdapter implements StorageAdapter { updatedAt: now, }; - await conversationsCollection.insertOne(conversation as any); + try { + await conversationsCollection.insertOne(conversation as any); + } catch (error: any) { + if (error.code === 11000) { + throw new ConversationAlreadyExistsError(input.id); + } + throw error; + } this.log(`Created conversation ${input.id}`); From 76dab7a318548a5a72a8d84aaa4f80ed3cb2c9b7 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:15:59 +0530 Subject: [PATCH 4/8] fix: explicitly externalize dependencies in tsup build --- packages/mongodb/tsup.config.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/mongodb/tsup.config.ts b/packages/mongodb/tsup.config.ts index 0819104fd..ae0c8c510 100644 --- a/packages/mongodb/tsup.config.ts +++ b/packages/mongodb/tsup.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ outDir: "dist", minify: false, dts: true, + external: ["@voltagent/core", "ai", "mongodb"], esbuildPlugins: [markAsExternalPlugin], esbuildOptions(options) { options.keepNames = true; From be1fd7143e5db61bb476c68708e54507ed22a541 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:29:08 +0530 Subject: [PATCH 5/8] fix: implement missing StorageAdapter methods --- packages/mongodb/src/memory-adapter.spec.ts | 17 ++++++------ packages/mongodb/src/memory-adapter.ts | 29 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index 96dac79f4..d96987eae 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -358,13 +358,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { async createConversation(input: CreateConversationInput): Promise { await this.initPromise; - const conversationsCollection = this.getCollection("conversations"); - - // Check if conversation already exists - const existing = await conversationsCollection.findOne({ _id: input.id } as any); - if (existing) { - throw new ConversationAlreadyExistsError(input.id); - } + const conversationsCollection = this.getCollection("conversations"); const now = new Date(); const conversation = { @@ -377,7 +371,14 @@ export class MongoDBMemoryAdapter implements StorageAdapter { updatedAt: now, }; - await conversationsCollection.insertOne(conversation as any); + try { + await conversationsCollection.insertOne(conversation as any); + } catch (error: any) { + if (error.code === 11000) { + throw new ConversationAlreadyExistsError(input.id); + } + throw error; + } this.log(`Created conversation ${input.id}`); diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 85541d414..812d25391 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -316,6 +316,21 @@ export class MongoDBMemoryAdapter implements StorageAdapter { })); } + /** + * Delete all messages for a conversation + */ + async deleteMessages(conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId }); + await stepsCollection.deleteMany({ conversationId }); + + this.log(`Deleted messages for conversation ${conversationId}`); + } + /** * Clear messages for a conversation or all conversations for a user */ @@ -359,7 +374,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { async createConversation(input: CreateConversationInput): Promise { await this.initPromise; - const conversationsCollection = this.getCollection("conversations"); + const conversationsCollection = this.getCollection("conversations"); const now = new Date(); const conversation = { @@ -373,7 +388,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { }; try { - await conversationsCollection.insertOne(conversation as any); + await conversationsCollection.insertOne(conversation); } catch (error: any) { if (error.code === 11000) { throw new ConversationAlreadyExistsError(input.id); @@ -394,6 +409,16 @@ export class MongoDBMemoryAdapter implements StorageAdapter { }; } + /** + * Count conversations for a user + */ + async countConversations(userId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + return conversationsCollection.countDocuments({ userId }); + } + /** * Get a conversation by ID */ From 545a62ab94d4d1005040c823db36054d474b359c Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:37:40 +0530 Subject: [PATCH 6/8] fix: resolve security vulnerability in deleteMessages and implement missing methods --- packages/mongodb/src/memory-adapter.ts | 34 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 812d25391..c4e17233b 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -317,18 +317,24 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } /** - * Delete all messages for a conversation + * Delete specific messages */ - async deleteMessages(conversationId: string): Promise { + async deleteMessages( + messageIds: string[], + userId: string, + conversationId: string, + ): Promise { await this.initPromise; const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - await messagesCollection.deleteMany({ conversationId }); - await stepsCollection.deleteMany({ conversationId }); + await messagesCollection.deleteMany({ + conversationId, + userId, + messageId: { $in: messageIds }, + }); - this.log(`Deleted messages for conversation ${conversationId}`); + this.log(`Deleted ${messageIds.length} messages from conversation ${conversationId}`); } /** @@ -410,13 +416,23 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } /** - * Count conversations for a user + * Count conversations based on filters */ - async countConversations(userId: string): Promise { + async countConversations(options: ConversationQueryOptions): Promise { await this.initPromise; const conversationsCollection = this.getCollection("conversations"); - return conversationsCollection.countDocuments({ userId }); + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + return conversationsCollection.countDocuments(filter); } /** From 7d0eb68835f0c41c402bc6a72c366c80f4c867b5 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 12:11:58 +0530 Subject: [PATCH 7/8] fix: refactor ID handling, update logic, and tests --- packages/mongodb/src/memory-adapter.spec.ts | 1047 ++----------------- packages/mongodb/src/memory-adapter.ts | 42 +- 2 files changed, 92 insertions(+), 997 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index d96987eae..82caa89ba 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -1,980 +1,73 @@ -/** - * MongoDB Storage Adapter for Memory - * Stores conversations and messages in MongoDB database - */ - -import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; -import type { - Conversation, - ConversationQueryOptions, - ConversationStepRecord, - CreateConversationInput, - GetConversationStepsOptions, - GetMessagesOptions, - StorageAdapter, - WorkflowRunQuery, - WorkflowStateEntry, - WorkingMemoryScope, -} from "@voltagent/core"; -import type { UIMessage } from "ai"; -import { type Collection, type Db, type Document, MongoClient } from "mongodb"; - -/** - * MongoDB configuration options for Memory - */ -export interface MongoDBMemoryOptions { - /** - * MongoDB connection URI - * Examples: - * - "mongodb://localhost:27017" - * - "mongodb://username:password@localhost:27017" - * - "mongodb+srv://username:password@cluster.mongodb.net" - */ - connection: string; - - /** - * Database name to use for collections - * @default "voltagent" - */ - database?: string; - - /** - * Prefix for collection names - * @default "voltagent_memory" - */ - collectionPrefix?: string; - - /** - * Whether to enable debug logging - * @default false - */ - debug?: boolean; -} - -/** - * MongoDB Storage Adapter for Memory - * Production-ready storage for conversations and messages - */ -export class MongoDBMemoryAdapter implements StorageAdapter { - private client: MongoClient; - private db: Db | null = null; - private databaseName: string; - private collectionPrefix: string; - private initialized = false; - private initPromise: Promise | null = null; - private debug: boolean; - - constructor(options: MongoDBMemoryOptions) { - this.databaseName = options.database ?? "voltagent"; - this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; - this.debug = options.debug ?? false; - - // Validate collection prefix - if ( - this.collectionPrefix.includes("\0") || - this.collectionPrefix.includes("$") || - this.collectionPrefix.startsWith("system.") - ) { - throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); - } - - // Create MongoDB client - this.client = new MongoClient(options.connection); - - this.log("MongoDB Memory adapter initialized"); - - // Start initialization but don't await it - this.initPromise = this.initialize(); - } - - /** - * Log debug messages - */ - private log(...args: any[]): void { - if (this.debug) { - console.log("[MongoDB Memory]", ...args); - } - } - - /** - * Generate a random ID - */ - private generateId(): string { - return ( - Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) - ); - } - - /** - * Get collection by name - */ - private getCollection(collectionName: string): Collection { - if (!this.db) { - throw new Error("Database not initialized"); - } - return this.db.collection(`${this.collectionPrefix}_${collectionName}`); - } - - /** - * Initialize database schema - */ - private async initialize(): Promise { - if (this.initialized) return; - - // Prevent multiple simultaneous initializations - if (this.initPromise && !this.initialized) { - return this.initPromise; - } - - try { - // Connect to MongoDB - await this.client.connect(); - this.db = this.client.db(this.databaseName); - - this.log(`Connected to MongoDB database: ${this.databaseName}`); - - // Create indexes for all collections - const conversationsCollection = this.getCollection("conversations"); - const messagesCollection = this.getCollection("messages"); - const workflowStatesCollection = this.getCollection("workflow_states"); - const stepsCollection = this.getCollection("steps"); - - // Users collection indexes (none needed beyond _id) - - // Conversations collection indexes - await conversationsCollection.createIndex({ userId: 1 }, { background: true }); - await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); - await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); - - // Messages collection indexes - await messagesCollection.createIndex( - { conversationId: 1, createdAt: 1 }, - { background: true }, - ); - await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); - // Unique compound index to enforce message uniqueness - await messagesCollection.createIndex( - { conversationId: 1, messageId: 1 }, - { unique: true, background: true }, - ); - - // Workflow states collection indexes - await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); - await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); - await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); - - // Steps collection indexes - await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); - await stepsCollection.createIndex( - { conversationId: 1, operationId: 1 }, - { background: true }, - ); - - this.initialized = true; - this.log("Database schema initialized with indexes"); - } catch (error) { - throw new Error( - `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, - ); - } - } - - /** - * Close MongoDB connection - */ - async close(): Promise { - await this.client.close(); - this.log("MongoDB connection closed"); - } - - // ============================================================================ - // Message Operations - // ============================================================================ - - /** - * Add a single message - */ - async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - - // Ensure conversation exists - const conversation = await this.getConversation(conversationId); - if (!conversation) { - throw new ConversationNotFoundError(conversationId); - } - - const messageId = message.id || this.generateId(); - - try { - await messagesCollection.insertOne({ - _id: undefined, // Let MongoDB generate ObjectId - conversationId, - messageId, - userId, - role: message.role, - parts: message.parts, - metadata: message.metadata || {}, - formatVersion: 2, - createdAt: new Date(), - } as any); - - this.log(`Added message ${messageId} to conversation ${conversationId}`); - } catch (error: any) { - if (error.code === 11000) { - throw new Error( - `Message with ID ${messageId} already exists in conversation ${conversationId}`, - ); - } - throw error; - } - } - - /** - * Add multiple messages - */ - async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { - await this.initPromise; - - if (messages.length === 0) return; - - const messagesCollection = this.getCollection("messages"); - - // Ensure conversation exists - const conversation = await this.getConversation(conversationId); - if (!conversation) { - throw new ConversationNotFoundError(conversationId); - } - - const documentsToInsert = messages.map((message) => ({ - _id: undefined, // Let MongoDB generate ObjectId - conversationId, - messageId: message.id || this.generateId(), - userId, - role: message.role, - parts: message.parts, - metadata: message.metadata || {}, - formatVersion: 2, - createdAt: new Date(), - })); - - try { - await messagesCollection.insertMany(documentsToInsert as any); - this.log(`Added ${messages.length} messages to conversation ${conversationId}`); - } catch (error: any) { - if (error.code === 11000) { - throw new Error(`One or more messages already exist in conversation ${conversationId}`); - } - throw error; - } - } - - /** - * Get messages for a conversation - */ - async getMessages( - userId: string, - conversationId: string, - options?: GetMessagesOptions, - ): Promise[]> { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - - const filter: any = { conversationId, userId }; - - if (options?.roles && options.roles.length > 0) { - filter.role = { $in: options.roles }; - } - - if (options?.after) { - filter.createdAt = { $gt: options.after }; - } - - if (options?.before) { - filter.createdAt = { ...filter.createdAt, $lt: options.before }; - } - - let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); - - if (options?.limit) { - cursor = cursor.limit(options.limit); - } - - const messages = await cursor.toArray(); - - return messages.map((msg: any) => ({ - id: msg.messageId, - role: msg.role, - parts: msg.parts, - metadata: { - ...msg.metadata, - createdAt: msg.createdAt, - }, - })); - } - - /** - * Clear messages for a conversation or all conversations for a user - */ - async clearMessages(userId: string, conversationId?: string): Promise { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - - if (conversationId) { - // Clear messages for specific conversation - await messagesCollection.deleteMany({ conversationId, userId }); - await stepsCollection.deleteMany({ conversationId, userId }); - this.log(`Cleared messages for conversation ${conversationId}`); - } else { - // Clear all messages for user - // First get all conversation IDs for this user - const conversationsCollection = this.getCollection("conversations"); - const userConversations = await conversationsCollection - .find({ userId }) - .project({ _id: 1 }) - .toArray(); - - const conversationIds = userConversations.map((conv: any) => conv._id); - - if (conversationIds.length > 0) { - await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); - await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); - this.log(`Cleared all messages for user ${userId}`); - } - } - } - - // ============================================================================ - // Conversation Operations - // ============================================================================ - - /** - * Create a new conversation - */ - async createConversation(input: CreateConversationInput): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const now = new Date(); - const conversation = { - _id: input.id, - resourceId: input.resourceId, - userId: input.userId, - title: input.title, - metadata: input.metadata || {}, - createdAt: now, - updatedAt: now, - }; - - try { - await conversationsCollection.insertOne(conversation as any); - } catch (error: any) { - if (error.code === 11000) { - throw new ConversationAlreadyExistsError(input.id); - } - throw error; - } - - this.log(`Created conversation ${input.id}`); - - return { - id: conversation._id, - resourceId: conversation.resourceId, - userId: conversation.userId, - title: conversation.title, - metadata: conversation.metadata, - createdAt: conversation.createdAt.toISOString(), - updatedAt: conversation.updatedAt.toISOString(), - }; - } - - /** - * Get a conversation by ID - */ - async getConversation(id: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - const conversation = await conversationsCollection.findOne({ _id: id } as any); - - if (!conversation) { - return null; - } - - return { - id: (conversation as any)._id, - resourceId: (conversation as any).resourceId, - userId: (conversation as any).userId, - title: (conversation as any).title, - metadata: (conversation as any).metadata || {}, - createdAt: (conversation as any).createdAt.toISOString(), - updatedAt: (conversation as any).updatedAt.toISOString(), - }; - } - - /** - * Get all conversations for a resource - */ - async getConversations(resourceId: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - const conversations = await conversationsCollection - .find({ resourceId } as any) - .sort({ updatedAt: -1 }) - .toArray(); - - return conversations.map((conv: any) => ({ - id: conv._id, - resourceId: conv.resourceId, - userId: conv.userId, - title: conv.title, - metadata: conv.metadata || {}, - createdAt: conv.createdAt.toISOString(), - updatedAt: conv.updatedAt.toISOString(), - })); - } - - /** - * Get all conversations for a user - */ - async getConversationsByUserId( - userId: string, - options?: Omit, - ): Promise { - return this.queryConversations({ ...options, userId }); - } - - /** - * Query conversations with filters - */ - async queryConversations(options: ConversationQueryOptions): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const filter: any = {}; - - if (options.userId) { - filter.userId = options.userId; - } - - if (options.resourceId) { - filter.resourceId = options.resourceId; - } - - let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); - - if (options.limit) { - cursor = cursor.limit(options.limit); - } - - if (options.offset) { - cursor = cursor.skip(options.offset); - } - - const conversations = await cursor.toArray(); - - return conversations.map((conv: any) => ({ - id: conv._id, - resourceId: conv.resourceId, - userId: conv.userId, - title: conv.title, - metadata: conv.metadata || {}, - createdAt: conv.createdAt.toISOString(), - updatedAt: conv.updatedAt.toISOString(), - })); - } - - /** - * Update a conversation - */ - async updateConversation( - id: string, - updates: Partial>, - ): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const updateDoc: any = { - updatedAt: new Date(), - }; - - if (updates.title !== undefined) { - updateDoc.title = updates.title; - } - - if (updates.metadata !== undefined) { - updateDoc.metadata = updates.metadata; - } - - if (updates.resourceId !== undefined) { - updateDoc.resourceId = updates.resourceId; - } - - if (updates.userId !== undefined) { - updateDoc.userId = updates.userId; - } - - const result = await conversationsCollection.findOneAndUpdate( - { _id: id } as any, - { $set: updateDoc }, - { returnDocument: "after" }, - ); - - if (!result) { - throw new ConversationNotFoundError(id); - } - - this.log(`Updated conversation ${id}`); - - return { - id: (result as any)._id, - resourceId: (result as any).resourceId, - userId: (result as any).userId, - title: (result as any).title, - metadata: (result as any).metadata || {}, - createdAt: (result as any).createdAt.toISOString(), - updatedAt: (result as any).updatedAt.toISOString(), - }; - } - - /** - * Delete a conversation - */ - async deleteConversation(id: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - // MongoDB will cascade delete messages and steps automatically via application logic - const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - - await messagesCollection.deleteMany({ conversationId: id } as any); - await stepsCollection.deleteMany({ conversationId: id } as any); - await conversationsCollection.deleteOne({ _id: id } as any); - - this.log(`Deleted conversation ${id}`); - } - - // ============================================================================ - // Conversation Steps Operations - // ============================================================================ - - /** - * Save conversation steps - */ - async saveConversationSteps(steps: ConversationStepRecord[]): Promise { - await this.initPromise; - - if (steps.length === 0) return; - - const stepsCollection = this.getCollection("steps"); - - const operations = steps.map((step) => { - const id = step.id || this.generateId(); - return { - replaceOne: { - filter: { _id: id }, - replacement: { - _id: id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), - }, - upsert: true, - }, - }; +import { MongoClient } from "mongodb"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { MongoDBMemoryAdapter } from "./memory-adapter"; + +vi.mock("mongodb", () => { + const collection = { + createIndex: vi.fn(), + insertOne: vi.fn(), + insertMany: vi.fn(), + find: vi.fn().mockReturnThis(), + sort: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + skip: vi.fn().mockReturnThis(), + toArray: vi.fn().mockResolvedValue([]), + findOne: vi.fn(), + updateOne: vi.fn(), + deleteMany: vi.fn(), + deleteOne: vi.fn(), + countDocuments: vi.fn(), + bulkWrite: vi.fn(), + }; + + const db = { + collection: vi.fn().mockReturnValue(collection), + }; + + const client = { + connect: vi.fn(), + db: vi.fn().mockReturnValue(db), + close: vi.fn(), + }; + + return { + MongoClient: vi.fn().mockImplementation(() => client), + }; +}); + +describe("MongoDBMemoryAdapter", () => { + let adapter: MongoDBMemoryAdapter; + + beforeEach(() => { + vi.clearAllMocks(); + adapter = new MongoDBMemoryAdapter({ + connection: "mongodb://localhost:27017", }); - - await stepsCollection.bulkWrite(operations as any); - - this.log(`Saved ${steps.length} conversation steps`); - } - - /** - * Get conversation steps - */ - async getConversationSteps( - userId: string, - conversationId: string, - options?: GetConversationStepsOptions, - ): Promise { - await this.initPromise; - - const stepsCollection = this.getCollection("steps"); - - const filter: any = { conversationId, userId }; - - if (options?.operationId) { - filter.operationId = options.operationId; - } - - let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); - - if (options?.limit) { - cursor = cursor.limit(options.limit); - } - - const steps = await cursor.toArray(); - - return steps.map((step: any) => ({ - id: step._id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: step.createdAt.toISOString(), - })); - } - - // ============================================================================ - // Working Memory Operations - // ============================================================================ - - /** - * Get working memory - */ - async getWorkingMemory(params: { - conversationId?: string; - userId?: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - const conversation = await conversationsCollection.findOne({ - _id: params.conversationId, - } as any); - - if (!conversation) { - return null; - } - - const workingMemory = (conversation as any).metadata?.workingMemory; - return workingMemory || null; - } - - if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - const user = await usersCollection.findOne({ _id: params.userId } as any); - - if (!user) { - return null; - } - - const workingMemory = (user as any).metadata?.workingMemory; - return workingMemory || null; - } - - return null; - } - - /** - * Set working memory - */ - async setWorkingMemory(params: { - conversationId?: string; - userId?: string; - content: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - - const conversation = await conversationsCollection.findOne({ - _id: params.conversationId, - } as any); - if (!conversation) { - throw new ConversationNotFoundError(params.conversationId); - } - - await conversationsCollection.updateOne({ _id: params.conversationId } as any, { - $set: { - "metadata.workingMemory": params.content, - updatedAt: new Date(), - }, - }); - - this.log(`Set working memory for conversation ${params.conversationId}`); - } else if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - - // Upsert user document with working memory - await usersCollection.updateOne( - { _id: params.userId } as any, - { - $set: { - "metadata.workingMemory": params.content, - updatedAt: new Date(), - }, - $setOnInsert: { - createdAt: new Date(), - }, - }, - { upsert: true }, - ); - - this.log(`Set working memory for user ${params.userId}`); - } - } - - /** - * Delete working memory - */ - async deleteWorkingMemory(params: { - conversationId?: string; - userId?: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - - await conversationsCollection.updateOne({ _id: params.conversationId } as any, { - $unset: { "metadata.workingMemory": "" }, - $set: { updatedAt: new Date() }, - }); - - this.log(`Deleted working memory for conversation ${params.conversationId}`); - } else if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - - await usersCollection.updateOne({ _id: params.userId } as any, { - $unset: { "metadata.workingMemory": "" }, - $set: { updatedAt: new Date() }, - }); - - this.log(`Deleted working memory for user ${params.userId}`); - } - } - - // ============================================================================ - // Workflow State Operations - // ============================================================================ - - /** - * Get workflow state by execution ID - */ - async getWorkflowState(executionId: string): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - const state = await workflowStatesCollection.findOne({ _id: executionId } as any); - - if (!state) { - return null; - } - - return { - id: (state as any)._id, - workflowId: (state as any).workflowId, - workflowName: (state as any).workflowName, - status: (state as any).status, - suspension: (state as any).suspension, - events: (state as any).events, - output: (state as any).output, - cancellation: (state as any).cancellation, - userId: (state as any).userId, - conversationId: (state as any).conversationId, - metadata: (state as any).metadata, - createdAt: (state as any).createdAt, - updatedAt: (state as any).updatedAt, + }); + + afterEach(async () => { + await adapter.close(); + }); + + it("should be defined", () => { + expect(adapter).toBeDefined(); + }); + + it("should initialize correctly", async () => { + await (adapter as any).initialize(); + expect(MongoClient).toHaveBeenCalledTimes(1); + }); + + it("should perform createConversation", async () => { + const input = { + id: "test-conv-id", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: {}, }; - } - - /** - * Query workflow runs with filters - */ - async queryWorkflowRuns(query: WorkflowRunQuery): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const filter: any = {}; - - if (query.workflowId) { - filter.workflowId = query.workflowId; - } - - if (query.status) { - filter.status = query.status; - } - - if (query.from) { - filter.createdAt = { $gte: query.from }; - } - - if (query.to) { - filter.createdAt = { ...filter.createdAt, $lte: query.to }; - } - - let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); - - if (query.limit) { - cursor = cursor.limit(query.limit); - } - - if (query.offset) { - cursor = cursor.skip(query.offset); - } - - const states = await cursor.toArray(); - - return states.map((state: any) => ({ - id: state._id, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt, - updatedAt: state.updatedAt, - })); - } - - /** - * Set workflow state (create or replace) - */ - async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const now = new Date(); - - await workflowStatesCollection.replaceOne( - { _id: executionId } as any, - { - _id: executionId, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt ? new Date(state.createdAt) : now, - updatedAt: now, - } as any, - { upsert: true }, - ); - - this.log(`Set workflow state ${executionId}`); - } - - /** - * Update workflow state (partial update) - */ - async updateWorkflowState( - executionId: string, - updates: Partial, - ): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const updateDoc: any = { - updatedAt: new Date(), - }; - - if (updates.status !== undefined) { - updateDoc.status = updates.status; - } - - if (updates.suspension !== undefined) { - updateDoc.suspension = updates.suspension; - } - - if (updates.events !== undefined) { - updateDoc.events = updates.events; - } - - if (updates.output !== undefined) { - updateDoc.output = updates.output; - } - - if (updates.cancellation !== undefined) { - updateDoc.cancellation = updates.cancellation; - } - - if (updates.metadata !== undefined) { - updateDoc.metadata = updates.metadata; - } - - await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); - - this.log(`Updated workflow state ${executionId}`); - } - - /** - * Get suspended workflow states - */ - async getSuspendedWorkflowStates(workflowId: string): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const states = await workflowStatesCollection - .find({ workflowId, status: "suspended" } as any) - .sort({ createdAt: -1 }) - .toArray(); - return states.map((state: any) => ({ - id: state._id, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt, - updatedAt: state.updatedAt, - })); - } -} + const conv = await adapter.createConversation(input); + expect(conv.id).toBe(input.id); + }); +}); diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index c4e17233b..91e5b5d94 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -210,7 +210,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { try { await messagesCollection.insertOne({ - _id: undefined, conversationId, messageId, userId, @@ -249,7 +248,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } const documentsToInsert = messages.map((message) => ({ - _id: undefined, // Let MongoDB generate ObjectId conversationId, messageId: message.id || this.generateId(), userId, @@ -623,25 +621,29 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const operations = steps.map((step) => { const id = step.id || this.generateId(); return { - replaceOne: { + updateOne: { filter: { _id: id }, - replacement: { - _id: id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + update: { + $set: { + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + }, + $setOnInsert: { + _id: id, + createdAt: new Date(), + }, }, upsert: true, }, From ee9c1ffddfdefc048ed049f5b4d61d5056be843e Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Wed, 4 Mar 2026 20:19:51 +0530 Subject: [PATCH 8/8] feat: add minimal with-mongodb usage example --- examples/with-mongodb/.env.example | 7 ++++ examples/with-mongodb/.gitignore | 3 ++ examples/with-mongodb/README.md | 53 +++++++++++++++++++++++++++++ examples/with-mongodb/package.json | 41 ++++++++++++++++++++++ examples/with-mongodb/src/index.ts | 42 +++++++++++++++++++++++ examples/with-mongodb/tsconfig.json | 17 +++++++++ 6 files changed, 163 insertions(+) create mode 100644 examples/with-mongodb/.env.example create mode 100644 examples/with-mongodb/.gitignore create mode 100644 examples/with-mongodb/README.md create mode 100644 examples/with-mongodb/package.json create mode 100644 examples/with-mongodb/src/index.ts create mode 100644 examples/with-mongodb/tsconfig.json diff --git a/examples/with-mongodb/.env.example b/examples/with-mongodb/.env.example new file mode 100644 index 000000000..d61798c48 --- /dev/null +++ b/examples/with-mongodb/.env.example @@ -0,0 +1,7 @@ +# MongoDB Configuration +MONGO_URI=mongodb://localhost:27017 +# or for Atlas: +# MONGO_URI=mongodb+srv://user:password@cluster.mongodb.net + +# OpenAI API Key (required) +OPENAI_API_KEY=your_openai_api_key_here diff --git a/examples/with-mongodb/.gitignore b/examples/with-mongodb/.gitignore new file mode 100644 index 000000000..9c97bbd46 --- /dev/null +++ b/examples/with-mongodb/.gitignore @@ -0,0 +1,3 @@ +node_modules +dist +.env diff --git a/examples/with-mongodb/README.md b/examples/with-mongodb/README.md new file mode 100644 index 000000000..999285307 --- /dev/null +++ b/examples/with-mongodb/README.md @@ -0,0 +1,53 @@ +
+ +435380213-b6253409-8741-462b-a346-834cd18565a9 + + +
+
+ + +
+ +
+ +
+ VoltAgent is an open source TypeScript framework for building and orchestrating AI agents.
+Escape the limitations of no-code builders and the complexity of starting from scratch. +
+
+
+ +
+ +[![npm version](https://img.shields.io/npm/v/@voltagent/core.svg)](https://www.npmjs.com/package/@voltagent/core) +[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.0-4baaaa.svg)](CODE_OF_CONDUCT.md) +[![Discord](https://img.shields.io/discord/1361559153780195478.svg?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2)](https://s.voltagent.dev/discord) +[![Twitter Follow](https://img.shields.io/twitter/follow/voltagent_dev?style=social)](https://twitter.com/voltagent_dev) + +
+ +
+ +
+ +VoltAgent Schema + + +
+ +## VoltAgent: Build AI Agents Fast and Flexibly + +VoltAgent is an open-source TypeScript framework for creating and managing AI agents. It provides modular components to build, customize, and scale agents with ease. From connecting to APIs and memory management to supporting multiple LLMs, VoltAgent simplifies the process of creating sophisticated AI systems. It enables fast development, maintains clean code, and offers flexibility to switch between models and tools without vendor lock-in. + +## Try Example + +```bash +npm create voltagent-app@latest -- --example with-mongodb +``` diff --git a/examples/with-mongodb/package.json b/examples/with-mongodb/package.json new file mode 100644 index 000000000..ffd99057f --- /dev/null +++ b/examples/with-mongodb/package.json @@ -0,0 +1,41 @@ +{ + "name": "voltagent-example-with-mongodb", + "description": "VoltAgent example demonstrating MongoDB integration for memory.", + "author": "", + "dependencies": { + "@voltagent/cli": "^0.1.21", + "@voltagent/core": "^2.3.1", + "@voltagent/logger": "^2.0.2", + "@voltagent/mongodb": "^2.0.2", + "@voltagent/server-hono": "^2.0.4", + "ai": "^6.0.0", + "mongodb": "^7.0.0", + "zod": "^3.25.76" + }, + "devDependencies": { + "@types/node": "^24.2.1", + "tsx": "^4.19.3", + "typescript": "^5.8.2" + }, + "keywords": [ + "agent", + "ai", + "memory", + "mongodb", + "voltagent" + ], + "license": "MIT", + "private": true, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "examples/with-mongodb" + }, + "scripts": { + "build": "tsc", + "dev": "tsx watch --env-file=.env ./src", + "start": "node dist/index.js", + "volt": "volt" + }, + "type": "module" +} \ No newline at end of file diff --git a/examples/with-mongodb/src/index.ts b/examples/with-mongodb/src/index.ts new file mode 100644 index 000000000..9a9b8ff61 --- /dev/null +++ b/examples/with-mongodb/src/index.ts @@ -0,0 +1,42 @@ +import { Agent, Memory, VoltAgent } from "@voltagent/core"; +import { createPinoLogger } from "@voltagent/logger"; +import { MongoDBMemoryAdapter } from "@voltagent/mongodb"; +import { honoServer } from "@voltagent/server-hono"; + +// Configure MongoDB Memory +const memoryStorage = new MongoDBMemoryAdapter({ + // MongoDB connection URI + connection: process.env.MONGO_URI || "mongodb://localhost:27017", + + // Optional: Database name (default: "voltagent") + database: "voltagent", + + // Optional: Customize collection name prefix + collectionPrefix: "voltagent_memory", + + // Optional: Enable debug logging for storage + debug: process.env.NODE_ENV === "development", +}); + +const agent = new Agent({ + name: "MongoDB Memory Agent", + instructions: "A helpful assistant that remembers conversations using MongoDB.", + model: "openai/gpt-4o-mini", + memory: new Memory({ + storage: memoryStorage, + }), +}); + +// Create logger +const logger = createPinoLogger({ + name: "with-mongodb", + level: "info", +}); + +new VoltAgent({ + agents: { + agent, + }, + logger, + server: honoServer({ port: 3141 }), +}); diff --git a/examples/with-mongodb/tsconfig.json b/examples/with-mongodb/tsconfig.json new file mode 100644 index 000000000..f51a9b55e --- /dev/null +++ b/examples/with-mongodb/tsconfig.json @@ -0,0 +1,17 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src", + "module": "ESNext", + "target": "ES2022", + "moduleResolution": "node" + }, + "include": [ + "src/**/*" + ], + "exclude": [ + "dist", + "node_modules" + ] +} \ No newline at end of file