Skip to content

Commit d5df801

Browse files
committed
feat: SQLBot supports custom data types in data sources #768
1 parent 164dbdd commit d5df801

File tree

6 files changed

+183
-10
lines changed

6 files changed

+183
-10
lines changed

backend/apps/datasource/api/datasource.py

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,29 @@
88
from typing import List
99
from urllib.parse import quote
1010

11-
import orjson
1211
import pandas as pd
13-
from psycopg2 import sql
1412
from fastapi import APIRouter, File, UploadFile, HTTPException, Path
1513
from fastapi.responses import StreamingResponse
14+
from psycopg2 import sql
1615
from sqlalchemy import and_
1716

1817
from apps.db.db import get_schema
1918
from apps.db.engine import get_engine_conn
2019
from apps.swagger.i18n import PLACEHOLDER_PREFIX
2120
from apps.system.schemas.permission import SqlbotPermission, require_permissions
21+
from common.audit.models.log_model import OperationType, OperationModules
22+
from common.audit.schemas.logger_decorator import LogConfig, system_log
2223
from common.core.config import settings
2324
from common.core.deps import SessionDep, CurrentUser, Trans
2425
from common.utils.utils import SQLBotLogUtil
2526
from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \
26-
execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \
27+
update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \
2728
check_status_by_id, sync_single_fields
2829
from ..crud.field import get_fields_by_table_id
2930
from ..crud.table import get_tables_by_ds_id
3031
from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField, FieldObj, \
31-
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse
32-
from common.audit.models.log_model import OperationType, OperationModules
33-
from common.audit.schemas.logger_decorator import LogConfig, system_log
32+
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse, ImportRequest
33+
from ..utils.excel import parse_excel_preview, USER_TYPE_TO_PANDAS
3434

3535
router = APIRouter(tags=["Datasource"], prefix="/datasource")
3636
path = settings.EXCEL_PATH
@@ -81,20 +81,21 @@ async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDa
8181
8282
return await asyncio.to_thread(inner) """
8383
loop = asyncio.get_event_loop()
84-
84+
8585
def sync_wrapper():
8686
loop = asyncio.new_event_loop()
8787
asyncio.set_event_loop(loop)
8888
try:
8989
return loop.run_until_complete(create_ds(session, trans, user, ds))
9090
finally:
9191
loop.close()
92-
92+
9393
return await loop.run_in_executor(None, sync_wrapper)
9494

9595

9696
@router.post("/chooseTables/{id}", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_choose_tables")
97-
@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id")))
97+
@require_permissions(
98+
permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id")))
9899
async def choose_tables(session: SessionDep, trans: Trans, tables: List[CoreTable],
99100
id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")):
100101
def inner():
@@ -104,7 +105,8 @@ def inner():
104105

105106

106107
@router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update")
107-
@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id")))
108+
@require_permissions(
109+
permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id")))
108110
@system_log(
109111
LogConfig(operation_type=OperationType.UPDATE, module=OperationModules.DATASOURCE, resource_id_expr="ds.id"))
110112
async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource):
@@ -326,6 +328,7 @@ def inner():
326328
# return await asyncio.to_thread(inner)
327329

328330

331+
# deprecated
329332
@router.post("/uploadExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_upload_excel")
330333
async def upload_excel(session: SessionDep, file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")):
331334
ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"}
@@ -537,3 +540,86 @@ async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=
537540
return True
538541
except Exception as e:
539542
raise HTTPException(status_code=500, detail=f"Parse Excel Failed: {str(e)}")
543+
544+
545+
@router.post("/parseExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_parse_excel")
546+
async def parse_excel(file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")):
547+
ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"}
548+
if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)):
549+
raise HTTPException(400, "Only support .xlsx/.xls/.csv")
550+
551+
os.makedirs(path, exist_ok=True)
552+
filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}"
553+
save_path = os.path.join(path, filename)
554+
with open(save_path, "wb") as f:
555+
f.write(await file.read())
556+
557+
def inner():
558+
sheets_data = parse_excel_preview(save_path)
559+
return {
560+
"filePath": filename,
561+
"data": sheets_data
562+
}
563+
564+
return await asyncio.to_thread(inner)
565+
566+
567+
@router.post("/importToDb", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_import_to_db")
568+
async def import_to_db(session: SessionDep, import_req: ImportRequest):
569+
save_path = os.path.join(path, import_req.filePath)
570+
if not os.path.exists(save_path):
571+
raise HTTPException(400, "File not found")
572+
573+
def inner():
574+
engine = get_engine_conn()
575+
results = []
576+
577+
for sheet_info in import_req.sheets:
578+
sheet_name = sheet_info.sheetName
579+
table_name = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}"
580+
fields = sheet_info.fields
581+
582+
field_mapping = {f.fieldName: f.fieldType for f in fields}
583+
dtype_dict = {
584+
col: USER_TYPE_TO_PANDAS.get(field_mapping.get(col, 'string'), 'string')
585+
for col in field_mapping.keys()
586+
}
587+
588+
if save_path.endswith(".csv"):
589+
df = pd.read_csv(save_path, engine='c', dtype=dtype_dict)
590+
sheet_name = "Sheet1"
591+
else:
592+
df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine', dtype=dtype_dict)
593+
594+
conn = engine.raw_connection()
595+
cursor = conn.cursor()
596+
try:
597+
df.to_sql(
598+
table_name,
599+
engine,
600+
if_exists='replace',
601+
index=False
602+
)
603+
output = StringIO()
604+
df.to_csv(output, sep='\t', header=False, index=False)
605+
606+
query = sql.SQL("COPY {} FROM STDIN WITH CSV DELIMITER E'\t'").format(
607+
sql.Identifier(table_name)
608+
)
609+
cursor.copy_expert(sql=query.as_string(cursor.connection), file=output)
610+
conn.commit()
611+
results.append({
612+
"sheetName": sheet_name,
613+
"tableName": table_name,
614+
"tableComment": "",
615+
"rows": len(df)
616+
})
617+
except Exception as e:
618+
raise HTTPException(400, f"Insert data failed for {table_name}: {str(e)}")
619+
finally:
620+
cursor.close()
621+
conn.close()
622+
623+
return {"filename": import_req.filePath, "sheets": results}
624+
625+
return await asyncio.to_thread(inner)

backend/apps/datasource/models/datasource.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,18 @@ class PreviewResponse(BaseModel):
192192
fields: List | None = []
193193
data: List | None = []
194194
sql: str | None = ''
195+
196+
197+
class FieldInfo(BaseModel):
198+
fieldName: str
199+
fieldType: str
200+
201+
202+
class SheetFields(BaseModel):
203+
sheetName: str
204+
fields: List[FieldInfo]
205+
206+
207+
class ImportRequest(BaseModel):
208+
filePath: str
209+
sheets: List[SheetFields]
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .excel import (
2+
FIELD_TYPE_MAP,
3+
USER_TYPE_TO_PANDAS,
4+
infer_field_type,
5+
parse_excel_preview,
6+
)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import pandas as pd
2+
3+
FIELD_TYPE_MAP = {
4+
'int64': 'int',
5+
'int32': 'int',
6+
'float64': 'float',
7+
'float32': 'float',
8+
'datetime64': 'datetime',
9+
'datetime64[ns]': 'datetime',
10+
'object': 'string',
11+
'string': 'string',
12+
'bool': 'string',
13+
}
14+
15+
USER_TYPE_TO_PANDAS = {
16+
'int': 'int64',
17+
'float': 'float64',
18+
'datetime': 'datetime64[ns]',
19+
'string': 'string',
20+
}
21+
22+
23+
def infer_field_type(dtype) -> str:
24+
dtype_str = str(dtype)
25+
return FIELD_TYPE_MAP.get(dtype_str, 'string')
26+
27+
28+
def parse_excel_preview(save_path: str, max_rows: int = 10):
29+
sheets_data = []
30+
if save_path.endswith(".csv"):
31+
df = pd.read_csv(save_path, engine='c')
32+
fields = []
33+
for col in df.columns:
34+
fields.append({
35+
"fieldName": col,
36+
"fieldType": infer_field_type(df[col].dtype)
37+
})
38+
preview_data = df.head(max_rows).to_dict(orient='records')
39+
sheets_data.append({
40+
"sheetName": "Sheet1",
41+
"fields": fields,
42+
"data": preview_data,
43+
"rows": len(df)
44+
})
45+
else:
46+
sheet_names = pd.ExcelFile(save_path).sheet_names
47+
for sheet_name in sheet_names:
48+
df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine')
49+
fields = []
50+
for col in df.columns:
51+
fields.append({
52+
"fieldName": col,
53+
"fieldType": infer_field_type(df[col].dtype)
54+
})
55+
preview_data = df.head(max_rows).to_dict(orient='records')
56+
sheets_data.append({
57+
"sheetName": sheet_name,
58+
"fields": fields,
59+
"data": preview_data,
60+
"rows": len(df)
61+
})
62+
return sheets_data

backend/apps/swagger/locales/en.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
"ds_preview_data": "Preview Data",
2323
"ds_upload_excel": "Upload Excel",
2424
"ds_excel": "File",
25+
"ds_parse_excel": "Parse Excel and Preview Data",
26+
"ds_import_to_db": "Import Data to Database",
2527
"ds_export_ds_schema": "Export Comment",
2628
"ds_upload_ds_schema": "Upload Comment",
2729

backend/apps/swagger/locales/zh.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
"ds_preview_data": "预览数据",
2323
"ds_upload_excel": "上传Excel",
2424
"ds_excel": "文件",
25+
"ds_parse_excel": "解析Excel并预览数据",
26+
"ds_import_to_db": "导入数据到数据库",
2527
"ds_export_ds_schema": "导出备注信息",
2628
"ds_upload_ds_schema": "导入备注信息",
2729

0 commit comments

Comments
 (0)