Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 96 additions & 10 deletions backend/apps/datasource/api/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@
from typing import List
from urllib.parse import quote

import orjson
import pandas as pd
from psycopg2 import sql
from fastapi import APIRouter, File, UploadFile, HTTPException, Path
from fastapi.responses import StreamingResponse
from psycopg2 import sql
from sqlalchemy import and_

from apps.db.db import get_schema
from apps.db.engine import get_engine_conn
from apps.swagger.i18n import PLACEHOLDER_PREFIX
from apps.system.schemas.permission import SqlbotPermission, require_permissions
from common.audit.models.log_model import OperationType, OperationModules
from common.audit.schemas.logger_decorator import LogConfig, system_log
from common.core.config import settings
from common.core.deps import SessionDep, CurrentUser, Trans
from common.utils.utils import SQLBotLogUtil
from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \
execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \
update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \
check_status_by_id, sync_single_fields
from ..crud.field import get_fields_by_table_id
from ..crud.table import get_tables_by_ds_id
from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField, FieldObj, \
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse
from common.audit.models.log_model import OperationType, OperationModules
from common.audit.schemas.logger_decorator import LogConfig, system_log
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse, ImportRequest
from ..utils.excel import parse_excel_preview, USER_TYPE_TO_PANDAS

router = APIRouter(tags=["Datasource"], prefix="/datasource")
path = settings.EXCEL_PATH
Expand Down Expand Up @@ -81,20 +81,21 @@ async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDa

return await asyncio.to_thread(inner) """
loop = asyncio.get_event_loop()

def sync_wrapper():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(create_ds(session, trans, user, ds))
finally:
loop.close()

return await loop.run_in_executor(None, sync_wrapper)


@router.post("/chooseTables/{id}", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_choose_tables")
@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id")))
@require_permissions(
permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id")))
async def choose_tables(session: SessionDep, trans: Trans, tables: List[CoreTable],
id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")):
def inner():
Expand All @@ -104,7 +105,8 @@ def inner():


@router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update")
@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id")))
@require_permissions(
permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id")))
@system_log(
LogConfig(operation_type=OperationType.UPDATE, module=OperationModules.DATASOURCE, resource_id_expr="ds.id"))
async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource):
Expand Down Expand Up @@ -326,6 +328,7 @@ def inner():
# return await asyncio.to_thread(inner)


# deprecated
@router.post("/uploadExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_upload_excel")
async def upload_excel(session: SessionDep, file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")):
ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"}
Expand Down Expand Up @@ -537,3 +540,86 @@ async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"Parse Excel Failed: {str(e)}")


@router.post("/parseExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_parse_excel")
async def parse_excel(file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")):
ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"}
if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)):
raise HTTPException(400, "Only support .xlsx/.xls/.csv")

os.makedirs(path, exist_ok=True)
filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}"
save_path = os.path.join(path, filename)
with open(save_path, "wb") as f:
f.write(await file.read())

def inner():
sheets_data = parse_excel_preview(save_path)
return {
"filePath": filename,
"data": sheets_data
}

return await asyncio.to_thread(inner)


@router.post("/importToDb", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_import_to_db")
async def import_to_db(session: SessionDep, import_req: ImportRequest):
save_path = os.path.join(path, import_req.filePath)
if not os.path.exists(save_path):
raise HTTPException(400, "File not found")

def inner():
engine = get_engine_conn()
results = []

for sheet_info in import_req.sheets:
sheet_name = sheet_info.sheetName
table_name = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}"
fields = sheet_info.fields

field_mapping = {f.fieldName: f.fieldType for f in fields}
dtype_dict = {
col: USER_TYPE_TO_PANDAS.get(field_mapping.get(col, 'string'), 'string')
for col in field_mapping.keys()
}

if save_path.endswith(".csv"):
df = pd.read_csv(save_path, engine='c', dtype=dtype_dict)
sheet_name = "Sheet1"
else:
df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine', dtype=dtype_dict)

conn = engine.raw_connection()
cursor = conn.cursor()
try:
df.to_sql(
table_name,
engine,
if_exists='replace',
index=False
)
output = StringIO()
df.to_csv(output, sep='\t', header=False, index=False)

query = sql.SQL("COPY {} FROM STDIN WITH CSV DELIMITER E'\t'").format(
sql.Identifier(table_name)
)
cursor.copy_expert(sql=query.as_string(cursor.connection), file=output)
conn.commit()
results.append({
"sheetName": sheet_name,
"tableName": table_name,
"tableComment": "",
"rows": len(df)
})
except Exception as e:
raise HTTPException(400, f"Insert data failed for {table_name}: {str(e)}")
finally:
cursor.close()
conn.close()

return {"filename": import_req.filePath, "sheets": results}

return await asyncio.to_thread(inner)
15 changes: 15 additions & 0 deletions backend/apps/datasource/models/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,18 @@ class PreviewResponse(BaseModel):
fields: List | None = []
data: List | None = []
sql: str | None = ''


class FieldInfo(BaseModel):
fieldName: str
fieldType: str


class SheetFields(BaseModel):
sheetName: str
fields: List[FieldInfo]


class ImportRequest(BaseModel):
filePath: str
sheets: List[SheetFields]
6 changes: 6 additions & 0 deletions backend/apps/datasource/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .excel import (
FIELD_TYPE_MAP,
USER_TYPE_TO_PANDAS,
infer_field_type,
parse_excel_preview,
)
62 changes: 62 additions & 0 deletions backend/apps/datasource/utils/excel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pandas as pd

FIELD_TYPE_MAP = {
'int64': 'int',
'int32': 'int',
'float64': 'float',
'float32': 'float',
'datetime64': 'datetime',
'datetime64[ns]': 'datetime',
'object': 'string',
'string': 'string',
'bool': 'string',
}

USER_TYPE_TO_PANDAS = {
'int': 'int64',
'float': 'float64',
'datetime': 'datetime64[ns]',
'string': 'string',
}


def infer_field_type(dtype) -> str:
dtype_str = str(dtype)
return FIELD_TYPE_MAP.get(dtype_str, 'string')


def parse_excel_preview(save_path: str, max_rows: int = 10):
sheets_data = []
if save_path.endswith(".csv"):
df = pd.read_csv(save_path, engine='c')
fields = []
for col in df.columns:
fields.append({
"fieldName": col,
"fieldType": infer_field_type(df[col].dtype)
})
preview_data = df.head(max_rows).to_dict(orient='records')
sheets_data.append({
"sheetName": "Sheet1",
"fields": fields,
"data": preview_data,
"rows": len(df)
})
else:
sheet_names = pd.ExcelFile(save_path).sheet_names
for sheet_name in sheet_names:
df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine')
fields = []
for col in df.columns:
fields.append({
"fieldName": col,
"fieldType": infer_field_type(df[col].dtype)
})
preview_data = df.head(max_rows).to_dict(orient='records')
sheets_data.append({
"sheetName": sheet_name,
"fields": fields,
"data": preview_data,
"rows": len(df)
})
return sheets_data
2 changes: 2 additions & 0 deletions backend/apps/swagger/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"ds_preview_data": "Preview Data",
"ds_upload_excel": "Upload Excel",
"ds_excel": "File",
"ds_parse_excel": "Parse Excel and Preview Data",
"ds_import_to_db": "Import Data to Database",
"ds_export_ds_schema": "Export Comment",
"ds_upload_ds_schema": "Upload Comment",

Expand Down
2 changes: 2 additions & 0 deletions backend/apps/swagger/locales/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"ds_preview_data": "预览数据",
"ds_upload_excel": "上传Excel",
"ds_excel": "文件",
"ds_parse_excel": "解析Excel并预览数据",
"ds_import_to_db": "导入数据到数据库",
"ds_export_ds_schema": "导出备注信息",
"ds_upload_ds_schema": "导入备注信息",

Expand Down
Loading