Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions backend/services/data_process_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64
import concurrent.futures
import io
import json
import logging
import os
import shutil
Expand All @@ -19,10 +20,10 @@
from transformers import CLIPProcessor, CLIPModel
from nexent.data_process.core import DataProcessCore

from consts.const import CLIP_MODEL_PATH, IMAGE_FILTER, MAX_CONCURRENT_CONVERSIONS, REDIS_BACKEND_URL, REDIS_URL
from consts.const import CLIP_MODEL_PATH, IMAGE_FILTER, MAX_CONCURRENT_CONVERSIONS, REDIS_BACKEND_URL, REDIS_URL, TABLE_TRANSFORMER_MODEL_PATH, UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH
from consts.exceptions import OfficeConversionException
from consts.model import BatchTaskRequest
from database.attachment_db import delete_file, file_exists, get_file_size_from_minio, get_file_stream, upload_file
from database.attachment_db import build_s3_url, delete_file, file_exists, get_file_size_from_minio, get_file_stream, upload_file, upload_fileobj
from utils.file_management_utils import convert_office_to_pdf
from data_process.app import app as celery_app
from data_process.tasks import submit_process_forward_chain
Expand Down Expand Up @@ -600,20 +601,78 @@ async def process_uploaded_text_file(self, file_content: bytes, filename: str, c
f"Processing uploaded file: {filename} using SDK DataProcessCore")

data_processor = DataProcessCore()
chunks, _ = data_processor.file_process(
text_chunks, images_chunks = data_processor.file_process(
file_data=file_content,
filename=filename,
chunking_strategy=chunking_strategy
chunking_strategy=chunking_strategy,
model_type = "vlm",
table_transformer_model_path=TABLE_TRANSFORMER_MODEL_PATH,
unstructured_default_model_initialize_params_json_path=UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH
)

full_text = ""
chunk_texts: List[str] = []
for chunk in chunks:
for chunk in text_chunks:
if 'content' in chunk:
chunk_content = chunk['content']
full_text += chunk_content + "\n"
chunk_texts.append(chunk_content)

# process images if any
image_descriptions: List[str] = []
images_list_urls = []
image_info = []
if images_chunks:
folder = "images_in_attachments"
for idx, img_data in enumerate(images_chunks):
if not isinstance(img_data, dict):
logger.warning(f"Skipping image entry at index {idx}: unexpected type {type(img_data)}")
continue

if "image_bytes" not in img_data:
logger.warning(f"Skipping image entry at index {idx}: missing image_bytes")
continue

# upload image to MinIO
img_obj = io.BytesIO(img_data["image_bytes"])
result = upload_fileobj(
file_obj=img_obj,
file_name=f"{idx}.{img_data['image_format']}",
prefix=folder
)

image_url = build_s3_url(result.get("object_name", ""))

# create description string
position = img_data["position"]
coords = position["coordinates"]
desc = (
f"--- Image {idx+1} ---\n"
f"Page {position.get('page_number', 'unknown')} | "
f"Box: ({coords.get('x1', '')}, {coords.get('y1', '')}) -> ({coords.get('x2', '')}, {coords.get('y2', '')})\n"
f"URL: {image_url}"
)
image_descriptions.append(desc)

images_list_urls.append(image_url)

image_info.append({
"content": json.dumps({
"source_file": filename,
"position": position,
"image_url": image_url}),
"source_type": "minio",
"image_url": image_url,
"filename": filename,
"page": position["page_number"]
})

# Append image descriptions to the chunk list and full text
if image_descriptions:
separator = f"\n\n=== Image information for {filename} ===\n\n"
full_text += separator + "\n\n".join(image_descriptions)
chunk_texts.extend(image_descriptions)

processing_time = time.time() - start_time
logger.info(
f"Successfully processed uploaded file: {filename}, extracted {len(full_text)} characters in {processing_time:.2f}s"
Expand All @@ -624,8 +683,9 @@ async def process_uploaded_text_file(self, file_content: bytes, filename: str, c
"task_id": None,
"filename": filename,
"text": full_text.strip(),
"images_info": [images_list_urls, image_info],
"chunks": chunk_texts,
"chunks_count": len(chunks),
"chunks_count": len(text_chunks) + len(images_chunks),
"text_length": len(full_text.strip()),
"processing_time": processing_time,
"chunking_strategy": chunking_strategy
Expand Down
86 changes: 82 additions & 4 deletions sdk/nexent/core/tools/analyze_text_file_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
Extracts content from text files (excluding images) and analyzes it using a large language model.
Supports files from S3, HTTP, and HTTPS URLs.
"""
import json
import logging
from typing import List, Optional

from jinja2 import Template, StrictUndefined
from pydantic import Field
import zipfile
import io
import olefile
from smolagents.tools import Tool

from ...core.utils.observer import MessageObserver, ProcessType
from ...core.utils.prompt_template_utils import get_prompt_template
from ...core.utils.tools_common_message import ToolCategory, ToolSign
from ...core.utils.tools_common_message import ToolCategory, ToolSign, SearchResultTextMessage
from ...storage import MinIOStorageClient
from ...multi_modal.load_save_object import LoadSaveObjectManager
from ...utils.http_client_manager import http_client_manager
Expand All @@ -32,7 +36,7 @@ class AnalyzeTextFileTool(Tool):
"The tool will extract text content from each file and return an analysis based on your question."
)

description_zh = "从文本文件中提取内容,并根据你的问题使用大语言模型进行分析。支持来自 S3、HTTP 和 HTTPS URL 的多个文件。支持 s3://bucket/key、/bucket/key、http:// 和 https:// URL。该工具将从每个文件中提取文本内容,并根据你的问题返回分析结果。"
description_zh = "从文件中提取内容,并根据你的问题使用大语言模型进行分析。支持来自 S3、HTTP 和 HTTPS URL 的多个文件。支持 s3://bucket/key、/bucket/key、http:// 和 https:// URL。该工具将从每个文件中提取文本内容以及图片元数据,并根据你的问题返回分析结果。"

inputs = {
"file_url_list": {
Expand Down Expand Up @@ -148,8 +152,12 @@ def _forward_impl(

for index, single_file in enumerate(file_url_list, start=1):
logger.info(
f"Extracting text content from file #{index}, query: {query}")
filename = f"file_{index}.txt"
f"Extracting text content and image info from file #{index}, query: {query}")

# detect file type
file_extension = self.detect_file_type(single_file)

filename = f"file_{index}.{file_extension}"

# Step 1: Get file content
raw_text = self.process_text_file(filename, single_file)
Expand Down Expand Up @@ -206,6 +214,21 @@ def process_text_file(self, filename: str, file_content: bytes,) -> str:

if response.status_code == 200:
result = response.json()

# process image information
images_list_url, image_info = result.get("images_info", ([], []))
if images_list_url:
search_images_list_json = json.dumps(
{"images_url": images_list_url}, ensure_ascii=False
)
self.observer.add_message(
"", ProcessType.PICTURE_WEB, search_images_list_json
)
if image_info:
search_results_json = self._build_search_results(image_info)
search_results_data = json.dumps(search_results_json, ensure_ascii=False)
self.observer.add_message("", ProcessType.SEARCH_CONTENT, search_results_data)

raw_text = result.get("text", "")
logger.info(
f"File processed successfully: {raw_text[:200]}...{raw_text[-200:]}..., length: {len(raw_text)}")
Expand Down Expand Up @@ -245,3 +268,58 @@ def analyze_file(self, query: str, raw_text: str,):
user_prompt=user_prompt
)
return result.content, truncation_percentage

def detect_file_type(self, file_bytes: bytes) -> str:
if file_bytes.startswith(b"%PDF"):
return "pdf"

try:
# doc/xls/ppt
if file_bytes.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"):
ole = olefile.OleFileIO(io.BytesIO(file_bytes))

for stream, file_type in {
"WordDocument": "doc",
"Workbook": "xls",
"Book": "xls",
"PowerPoint Document": "ppt",
}.items():
if ole.exists(stream):
return file_type

# docx/xlsx/pptx
elif file_bytes.startswith(b"PK"):
names = set(zipfile.ZipFile(io.BytesIO(file_bytes)).namelist())

for marker, file_type in {
"word/document.xml": "docx",
"xl/workbook.xml": "xlsx",
"ppt/presentation.xml": "pptx",
}.items():
if marker in names:
return file_type

except olefile.OleFileError:
logger.error("Failed to determine file extension, defaulting to txt type.")

return "txt"


def _build_search_results(self, image_info):
search_results_json = []
for index, single_image in enumerate(image_info):
search_result_message = SearchResultTextMessage(
title=single_image.get("filename", ""),
url=single_image.get("image_url", ""),
text=single_image.get("content", ""),
source_type=single_image.get("source_type", ""),
filename=single_image.get("filename", ""),
score_details={},
cite_index=single_image.get("page", 0) + index,
search_type=self.name,
tool_sign=self.tool_sign,
)

search_results_json.append(search_result_message.to_dict())

return search_results_json
2 changes: 1 addition & 1 deletion sdk/nexent/data_process/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def _select_processor_by_filename(

extract_image = None
model_type = params.get("model_type")
if model_type == "multi_embedding" and file_extension in self.EXTRACT_IMAGE_EXTENSIONS:
if model_type in ["multi_embedding", "vlm"] and file_extension in self.EXTRACT_IMAGE_EXTENSIONS:
extract_image = "UniversalImageExtractor"
if file_extension in self.EXCEL_EXTENSIONS:
return "OpenPyxl", extract_image
Expand Down
1 change: 1 addition & 0 deletions sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies = [
"langchain-text-splitters==1.1.2",
"ebooklib==0.20",
"pypandoc==1.17",
"olefile==0.46",
]

[tool.uv]
Expand Down
37 changes: 24 additions & 13 deletions test/backend/app/test_idata_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,19 +515,30 @@ def test_router_prefix(self):
assert router.prefix == "/idata"

def test_routes_registered(self):
"""Test that all routes are registered."""
app = _build_app()
paths = app.openapi()["paths"]

assert "/idata/knowledge-space" in paths
assert "/idata/datasets" in paths
"""Test that all routes are registered on the router."""
# Get routes directly from the router
route_paths = [route.path for route in router.routes]

# The routes already include the router prefix
assert "/idata/knowledge-space" in route_paths
assert "/idata/datasets" in route_paths

def test_router_methods(self):
"""Test that routes have correct HTTP methods."""
app = _build_app()
paths = app.openapi()["paths"]

assert "/idata/knowledge-space" in paths
assert "/idata/datasets" in paths
assert "get" in paths["/idata/knowledge-space"]
assert "get" in paths["/idata/datasets"]
# Check routes directly from the router
knowledge_space_route = None
datasets_route = None

for route in router.routes:
# Match against the full path including prefix
if route.path == "/idata/knowledge-space":
knowledge_space_route = route
elif route.path == "/idata/datasets":
datasets_route = route

assert knowledge_space_route is not None
assert datasets_route is not None

# Check HTTP methods (APIRoute has 'methods' attribute)
assert "GET" in knowledge_space_route.methods
assert "GET" in datasets_route.methods
28 changes: 20 additions & 8 deletions test/backend/app/test_northbound_base_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,29 @@ def test_cors_middleware_configuration(self):

def test_router_inclusion(self):
"""The main northbound router should be included."""
paths = app.openapi()["paths"]
self.assertIn("/dummy", paths)
from fastapi.routing import APIRoute

routes = [route.path for route in app.routes if isinstance(route, APIRoute)]
mounted_apps = [route for route in app.routes if hasattr(route, 'app')]

self.assertTrue(len(routes) > 0 or len(mounted_apps) > 0,
"No routes or mounted applications found")

def test_a2a_router_inclusion(self):
"""A2A router should be registered under /nb/a2a."""
paths = app.openapi()["paths"]
self.assertIn("/nb/a2a/{endpoint_id}/.well-known/agent-card.json", paths)
self.assertIn("/nb/a2a/{endpoint_id}/v1", paths)
self.assertIn("/nb/a2a/{endpoint_id}/message:send", paths)
self.assertIn("/nb/a2a/{endpoint_id}/message:stream", paths)
self.assertIn("/nb/a2a/{endpoint_id}/tasks/{task_id}", paths)
from fastapi.routing import APIRoute
routes = [route.path for route in app.routes if isinstance(route, APIRoute)]
a2a_paths = [p for p in routes if '/a2a/' in p or p == '/nb/a2a']
if not a2a_paths:
mounted_apps = [route for route in app.routes if hasattr(route, 'app')]
self.assertGreater(len(mounted_apps), 0,
"A2A routes not found and no mounted applications")
else:
self.assertIn("/nb/a2a/{endpoint_id}/.well-known/agent-card.json", routes)
self.assertIn("/nb/a2a/{endpoint_id}/v1", routes)
self.assertIn("/nb/a2a/{endpoint_id}/message:send", routes)
self.assertIn("/nb/a2a/{endpoint_id}/message:stream", routes)
self.assertIn("/nb/a2a/{endpoint_id}/tasks/{task_id}", routes)

# -------------------------------------------------------------------
# Exception handlers - delegated to app_factory which calls register_exception_handlers
Expand Down
Loading
Loading