88from typing import List
99from urllib .parse import quote
1010
11- import orjson
1211import pandas as pd
13- from psycopg2 import sql
1412from fastapi import APIRouter , File , UploadFile , HTTPException , Path
1513from fastapi .responses import StreamingResponse
14+ from psycopg2 import sql
1615from sqlalchemy import and_
1716
1817from apps .db .db import get_schema
1918from apps .db .engine import get_engine_conn
2019from apps .swagger .i18n import PLACEHOLDER_PREFIX
2120from apps .system .schemas .permission import SqlbotPermission , require_permissions
21+ from common .audit .models .log_model import OperationType , OperationModules
22+ from common .audit .schemas .logger_decorator import LogConfig , system_log
2223from common .core .config import settings
2324from common .core .deps import SessionDep , CurrentUser , Trans
2425from common .utils .utils import SQLBotLogUtil
2526from ..crud .datasource import get_datasource_list , check_status , create_ds , update_ds , delete_ds , getTables , getFields , \
26- execSql , update_table_and_fields , getTablesByDs , chooseTables , preview , updateTable , updateField , get_ds , fieldEnum , \
27+ update_table_and_fields , getTablesByDs , chooseTables , preview , updateTable , updateField , get_ds , fieldEnum , \
2728 check_status_by_id , sync_single_fields
2829from ..crud .field import get_fields_by_table_id
2930from ..crud .table import get_tables_by_ds_id
3031from ..models .datasource import CoreDatasource , CreateDatasource , TableObj , CoreTable , CoreField , FieldObj , \
31- TableSchemaResponse , ColumnSchemaResponse , PreviewResponse
32- from common .audit .models .log_model import OperationType , OperationModules
33- from common .audit .schemas .logger_decorator import LogConfig , system_log
32+ TableSchemaResponse , ColumnSchemaResponse , PreviewResponse , ImportRequest
33+ from ..utils .excel import parse_excel_preview , USER_TYPE_TO_PANDAS
3434
3535router = APIRouter (tags = ["Datasource" ], prefix = "/datasource" )
3636path = settings .EXCEL_PATH
@@ -81,20 +81,21 @@ async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDa
8181
8282 return await asyncio.to_thread(inner) """
8383 loop = asyncio .get_event_loop ()
84-
84+
8585 def sync_wrapper ():
8686 loop = asyncio .new_event_loop ()
8787 asyncio .set_event_loop (loop )
8888 try :
8989 return loop .run_until_complete (create_ds (session , trans , user , ds ))
9090 finally :
9191 loop .close ()
92-
92+
9393 return await loop .run_in_executor (None , sync_wrapper )
9494
9595
9696@router .post ("/chooseTables/{id}" , response_model = None , summary = f"{ PLACEHOLDER_PREFIX } ds_choose_tables" )
97- @require_permissions (permission = SqlbotPermission (role = ['ws_admin' ], permission = SqlbotPermission (type = 'ds' , keyExpression = "id" )))
97+ @require_permissions (
98+ permission = SqlbotPermission (role = ['ws_admin' ], permission = SqlbotPermission (type = 'ds' , keyExpression = "id" )))
9899async def choose_tables (session : SessionDep , trans : Trans , tables : List [CoreTable ],
99100 id : int = Path (..., description = f"{ PLACEHOLDER_PREFIX } ds_id" )):
100101 def inner ():
@@ -104,7 +105,8 @@ def inner():
104105
105106
106107@router .post ("/update" , response_model = CoreDatasource , summary = f"{ PLACEHOLDER_PREFIX } ds_update" )
107- @require_permissions (permission = SqlbotPermission (role = ['ws_admin' ], permission = SqlbotPermission (type = 'ds' , keyExpression = "ds.id" )))
108+ @require_permissions (
109+ permission = SqlbotPermission (role = ['ws_admin' ], permission = SqlbotPermission (type = 'ds' , keyExpression = "ds.id" )))
108110@system_log (
109111 LogConfig (operation_type = OperationType .UPDATE , module = OperationModules .DATASOURCE , resource_id_expr = "ds.id" ))
110112async def update (session : SessionDep , trans : Trans , user : CurrentUser , ds : CoreDatasource ):
@@ -326,6 +328,7 @@ def inner():
326328# return await asyncio.to_thread(inner)
327329
328330
331+ # deprecated
329332@router .post ("/uploadExcel" , response_model = None , summary = f"{ PLACEHOLDER_PREFIX } ds_upload_excel" )
330333async def upload_excel (session : SessionDep , file : UploadFile = File (..., description = f"{ PLACEHOLDER_PREFIX } ds_excel" )):
331334 ALLOWED_EXTENSIONS = {"xlsx" , "xls" , "csv" }
@@ -537,3 +540,86 @@ async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=
537540 return True
538541 except Exception as e :
539542 raise HTTPException (status_code = 500 , detail = f"Parse Excel Failed: { str (e )} " )
543+
544+
545+ @router .post ("/parseExcel" , response_model = None , summary = f"{ PLACEHOLDER_PREFIX } ds_parse_excel" )
546+ async def parse_excel (file : UploadFile = File (..., description = f"{ PLACEHOLDER_PREFIX } ds_excel" )):
547+ ALLOWED_EXTENSIONS = {"xlsx" , "xls" , "csv" }
548+ if not file .filename .lower ().endswith (tuple (ALLOWED_EXTENSIONS )):
549+ raise HTTPException (400 , "Only support .xlsx/.xls/.csv" )
550+
551+ os .makedirs (path , exist_ok = True )
552+ filename = f"{ file .filename .split ('.' )[0 ]} _{ hashlib .sha256 (uuid .uuid4 ().bytes ).hexdigest ()[:10 ]} .{ file .filename .split ('.' )[1 ]} "
553+ save_path = os .path .join (path , filename )
554+ with open (save_path , "wb" ) as f :
555+ f .write (await file .read ())
556+
557+ def inner ():
558+ sheets_data = parse_excel_preview (save_path )
559+ return {
560+ "filePath" : filename ,
561+ "data" : sheets_data
562+ }
563+
564+ return await asyncio .to_thread (inner )
565+
566+
567+ @router .post ("/importToDb" , response_model = None , summary = f"{ PLACEHOLDER_PREFIX } ds_import_to_db" )
568+ async def import_to_db (session : SessionDep , import_req : ImportRequest ):
569+ save_path = os .path .join (path , import_req .filePath )
570+ if not os .path .exists (save_path ):
571+ raise HTTPException (400 , "File not found" )
572+
573+ def inner ():
574+ engine = get_engine_conn ()
575+ results = []
576+
577+ for sheet_info in import_req .sheets :
578+ sheet_name = sheet_info .sheetName
579+ table_name = f"{ sheet_name } _{ hashlib .sha256 (uuid .uuid4 ().bytes ).hexdigest ()[:10 ]} "
580+ fields = sheet_info .fields
581+
582+ field_mapping = {f .fieldName : f .fieldType for f in fields }
583+ dtype_dict = {
584+ col : USER_TYPE_TO_PANDAS .get (field_mapping .get (col , 'string' ), 'string' )
585+ for col in field_mapping .keys ()
586+ }
587+
588+ if save_path .endswith (".csv" ):
589+ df = pd .read_csv (save_path , engine = 'c' , dtype = dtype_dict )
590+ sheet_name = "Sheet1"
591+ else :
592+ df = pd .read_excel (save_path , sheet_name = sheet_name , engine = 'calamine' , dtype = dtype_dict )
593+
594+ conn = engine .raw_connection ()
595+ cursor = conn .cursor ()
596+ try :
597+ df .to_sql (
598+ table_name ,
599+ engine ,
600+ if_exists = 'replace' ,
601+ index = False
602+ )
603+ output = StringIO ()
604+ df .to_csv (output , sep = '\t ' , header = False , index = False )
605+
606+ query = sql .SQL ("COPY {} FROM STDIN WITH CSV DELIMITER E'\t '" ).format (
607+ sql .Identifier (table_name )
608+ )
609+ cursor .copy_expert (sql = query .as_string (cursor .connection ), file = output )
610+ conn .commit ()
611+ results .append ({
612+ "sheetName" : sheet_name ,
613+ "tableName" : table_name ,
614+ "tableComment" : "" ,
615+ "rows" : len (df )
616+ })
617+ except Exception as e :
618+ raise HTTPException (400 , f"Insert data failed for { table_name } : { str (e )} " )
619+ finally :
620+ cursor .close ()
621+ conn .close ()
622+
623+ return {"filename" : import_req .filePath , "sheets" : results }
624+
625+ return await asyncio .to_thread (inner )
0 commit comments