55import json
66import os
77import re
8+ from typing import Dict , List , Union
89
9- from typing import List , Dict , Union
1010from edgecraftrag .api .v1 .data import add_data
11- from edgecraftrag .api_schema import DataIn , KnowledgeBaseCreateIn , ExperienceIn
11+ from edgecraftrag .api_schema import DataIn , ExperienceIn , KnowledgeBaseCreateIn
1212from edgecraftrag .base import IndexerType
13+ from edgecraftrag .components .query_preprocess import query_search
14+ from edgecraftrag .components .retriever import get_kbs_info
1315from edgecraftrag .context import ctx
1416from edgecraftrag .utils import compare_mappings
15- from edgecraftrag .components .retriever import get_kbs_info
16- from edgecraftrag .components .query_preprocess import query_search
1717from fastapi import FastAPI , HTTPException , status
1818from pymilvus .exceptions import MilvusException
1919
2323KNOWLEDGE_BASE_ROOT = "/home/user/ui_cache"
2424CONFIG_DIR = "/home/user/ui_cache/configs"
2525
26+
2627# Get all knowledge bases
2728@kb_app .get (path = "/v1/knowledge" )
2829async def get_all_knowledge_bases ():
@@ -52,7 +53,7 @@ async def create_knowledge_base(knowledge: KnowledgeBaseCreateIn):
5253 detail = "Knowledge base names must begin with a letter or underscore" ,
5354 )
5455
55- if knowledge .active and knowledge .comp_type == "knowledge" and knowledge .comp_subtype == "origin_kb" :
56+ if knowledge .active and knowledge .comp_type == "knowledge" and knowledge .comp_subtype == "origin_kb" :
5657 active_pl .indexer .reinitialize_indexer (knowledge .name )
5758 active_pl .update_indexer_to_retriever ()
5859 elif knowledge .comp_subtype == "kbadmin_kb" :
@@ -74,7 +75,10 @@ async def delete_knowledge_base(knowledge_name: str):
7475 if rm_kb .comp_type == "knowledge" and rm_kb .comp_subtype == "origin_kb" :
7576 if active_kb :
7677 if active_kb .name == knowledge_name or active_kb .idx == knowledge_name :
77- raise HTTPException (status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = "Cannot delete a running knowledge base." )
78+ raise HTTPException (
79+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
80+ detail = "Cannot delete a running knowledge base." ,
81+ )
7882 kb_file_path = rm_kb .get_file_paths ()
7983 if kb_file_path :
8084 if active_pl .indexer .comp_subtype == "milvus_vector" :
@@ -84,7 +88,10 @@ async def delete_knowledge_base(knowledge_name: str):
8488 active_pl .update_indexer_to_retriever ()
8589 if rm_kb .comp_type == "experience" :
8690 if rm_kb .experience_active :
87- raise HTTPException (status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = "Cannot delete a running experience knowledge base." )
91+ raise HTTPException (
92+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
93+ detail = "Cannot delete a running experience knowledge base." ,
94+ )
8895 else :
8996 rm_kb .clear_experiences ()
9097 result = ctx .knowledgemgr .delete_knowledge_base (knowledge_name )
@@ -129,10 +136,15 @@ async def add_file_to_knowledge_base(knowledge_name, file_path: DataIn):
129136 try :
130137 active_pl = ctx .get_pipeline_mgr ().get_active_pipeline ()
131138 kb = ctx .knowledgemgr .get_knowledge_base_by_name_or_id (knowledge_name )
132- if kb .comp_type == "experience" :
133- raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "The experience type cannot perform file operations." )
139+ if kb .comp_type == "experience" :
140+ raise HTTPException (
141+ status_code = status .HTTP_404_NOT_FOUND , detail = "The experience type cannot perform file operations."
142+ )
134143 if kb .comp_subtype == "kbadmin_kb" or active_pl .indexer .comp_subtype == "kbadmin_indexer" :
135- raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "Please proceed to the kbadmin interface to perform the operation." )
144+ raise HTTPException (
145+ status_code = status .HTTP_404_NOT_FOUND ,
146+ detail = "Please proceed to the kbadmin interface to perform the operation." ,
147+ )
136148 # Validate and normalize the user-provided path
137149 user_path = file_path .local_path
138150 normalized_path = os .path .normpath (os .path .join (KNOWLEDGE_BASE_ROOT , user_path ))
@@ -184,10 +196,15 @@ async def remove_file_from_knowledge_base(knowledge_name, file_path: DataIn):
184196 try :
185197 active_pl = ctx .get_pipeline_mgr ().get_active_pipeline ()
186198 kb = ctx .knowledgemgr .get_knowledge_base_by_name_or_id (knowledge_name )
187- if kb .comp_type == "experience" :
188- raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "The experience type cannot perform file operations." )
199+ if kb .comp_type == "experience" :
200+ raise HTTPException (
201+ status_code = status .HTTP_404_NOT_FOUND , detail = "The experience type cannot perform file operations."
202+ )
189203 if kb .comp_subtype == "kbadmin_kb" or active_pl .indexer .comp_subtype == "kbadmin_indexer" :
190- raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "Please proceed to the kbadmin interface to perform the operation." )
204+ raise HTTPException (
205+ status_code = status .HTTP_404_NOT_FOUND ,
206+ detail = "Please proceed to the kbadmin interface to perform the operation." ,
207+ )
191208 active_kb = ctx .knowledgemgr .get_active_knowledge_base ()
192209 if file_path .local_path in kb .get_file_paths ():
193210 kb .remove_file_path (file_path .local_path )
@@ -215,9 +232,10 @@ async def remove_file_from_knowledge_base(knowledge_name, file_path: DataIn):
215232 except ValueError as e :
216233 raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = str (e ))
217234
235+
218236@kb_app .post ("/v1/experience" )
219237def get_experience_by_question (req : ExperienceIn ):
220- kb = ctx .knowledgemgr .get_experience_kb ()
238+ kb = ctx .knowledgemgr .get_experience_kb ()
221239 result = kb .get_experience_by_question (req .question )
222240 if not result :
223241 raise HTTPException (404 , detail = "Experience not found" )
@@ -238,41 +256,41 @@ def update_experience(experience: ExperienceIn):
238256 kb = ctx .knowledgemgr .get_experience_kb ()
239257 result = kb .update_experience (experience .question , experience .content )
240258 if not result :
241- raise HTTPException (404 , detail = f "Question not found" )
259+ raise HTTPException (404 , detail = "Question not found" )
242260 return result
243261
244262
245263@kb_app .delete ("/v1/experiences" )
246- def delete_experience (req : ExperienceIn ):
264+ def delete_experience (req : ExperienceIn ):
247265 kb = ctx .knowledgemgr .get_experience_kb ()
248266 success = kb .delete_experience (req .question )
249267 if not success :
250268 raise HTTPException (404 , detail = f"Question { req .question } not found" )
251- return {"message" : f "Question deleted" }
269+ return {"message" : "Question deleted" }
252270
253271
254272@kb_app .post ("/v1/multiple_experiences/check" )
255273def check_duplicate_multiple_experiences (experiences : List [Dict [str , Union [str , List [str ]]]]):
256274 kb = ctx .knowledgemgr .get_experience_kb ()
257275 if not kb :
258- raise HTTPException (404 , detail = f "No active experience type knowledge base" )
276+ raise HTTPException (404 , detail = "No active experience type knowledge base" )
259277 all_existing = kb .get_all_experience ()
260278 existing_questions = {item ["question" ] for item in all_existing }
261279 new_questions = [exp ["question" ] for exp in experiences if "question" in exp ]
262280 duplicate_questions = [q for q in new_questions if q in existing_questions ]
263281 if duplicate_questions :
264282 return {"code" : 2001 , "detail" : "Duplicate experiences are appended OR overwritten!" }
265283 else :
266- kb .add_multiple_experiences (experiences = experiences , flag = True )
267- return {"status" : "success" ,"detail" : "No duplicate experiences, added successfully" }
284+ kb .add_multiple_experiences (experiences = experiences , flag = True )
285+ return {"status" : "success" , "detail" : "No duplicate experiences, added successfully" }
268286
269287
270288@kb_app .post ("/v1/multiple_experiences/confirm" )
271- def confirm_multiple_experiences (experiences : List [Dict [str , Union [str , List [str ]]]],flag : bool ):
289+ def confirm_multiple_experiences (experiences : List [Dict [str , Union [str , List [str ]]]], flag : bool ):
272290 kb = ctx .knowledgemgr .get_experience_kb ()
273291 try :
274292 if not kb :
275- raise HTTPException (404 , detail = f "No active experience type knowledge base" )
293+ raise HTTPException (404 , detail = "No active experience type knowledge base" )
276294 kb .add_multiple_experiences (experiences = experiences , flag = flag )
277295 return {"status" : "success" , "detail" : "Experiences added successfully" }
278296 except Exception as e :
@@ -291,17 +309,14 @@ def add_experiences_from_file(req: DataIn):
291309
292310@kb_app .post (path = "/v1/view_sub_questions" )
293311async def view_sub_questions (que : ExperienceIn ):
294- active_pl = ctx .get_pipeline_mgr ().get_active_pipeline ()
295- CONFIG_DIR
296- search_config_path = os .path .join (CONFIG_DIR ,"search_config.yaml" )
297- search_dir = os .path .join (CONFIG_DIR ,"experience_dir/experience.json" )
298- top1_issue , sub_questions_result = await query_search (
299- user_input = que .question ,
300- search_config_path = search_config_path ,
301- search_dir = search_dir ,
302- pl = active_pl
303- )
304- return sub_questions_result
312+ active_pl = ctx .get_pipeline_mgr ().get_active_pipeline ()
313+ CONFIG_DIR
314+ search_config_path = os .path .join (CONFIG_DIR , "search_config.yaml" )
315+ search_dir = os .path .join (CONFIG_DIR , "experience_dir/experience.json" )
316+ top1_issue , sub_questions_result = await query_search (
317+ user_input = que .question , search_config_path = search_config_path , search_dir = search_dir , pl = active_pl
318+ )
319+ return sub_questions_result
305320
306321
307322@kb_app .get ("/v1/kbadmin/kbs_list" )
@@ -310,9 +325,9 @@ def get_kbs_list():
310325 try :
311326 if not active_pl or active_pl .indexer .comp_subtype != "kbadmin_indexer" :
312327 return []
313- CONNECTION_ARGS = {"uri" : active_pl .indexer .vector_url }
328+ CONNECTION_ARGS = {"uri" : active_pl .indexer .vector_url }
314329 kbs_list = get_kbs_info (CONNECTION_ARGS )
315- kb_names = [name for name in kbs_list .keys ()]
330+ kb_names = [name for name in kbs_list .keys ()]
316331 return kb_names
317332 except Exception as e :
318333 raise HTTPException (status_code = 400 , detail = str (e ))
@@ -372,7 +387,7 @@ async def load_knowledge_from_file():
372387 for Knowledgebase_data in all_data :
373388 pipeline_req = KnowledgeBaseCreateIn (** Knowledgebase_data )
374389 kb = ctx .knowledgemgr .create_knowledge_base (pipeline_req )
375- if kb .comp_type == "knowledge" :
390+ if kb .comp_type == "knowledge" :
376391 if Knowledgebase_data ["file_map" ]:
377392 if active_pl .indexer .comp_subtype != "milvus_vector" and Knowledgebase_data ["active" ]:
378393 for file_path in Knowledgebase_data ["file_map" ].values ():
@@ -401,7 +416,15 @@ async def save_knowledge_to_file():
401416 kb_base = ctx .knowledgemgr .get_all_knowledge_bases ()
402417 knowledgebases_data = []
403418 for kb in kb_base :
404- kb_json = {"name" : kb .name , "description" : kb .description , "active" : kb .active , "file_map" : kb .file_map , "comp_type" : kb .comp_type , "comp_subtype" :kb .comp_subtype , "experience_active" : kb .experience_active }
419+ kb_json = {
420+ "name" : kb .name ,
421+ "description" : kb .description ,
422+ "active" : kb .active ,
423+ "file_map" : kb .file_map ,
424+ "comp_type" : kb .comp_type ,
425+ "comp_subtype" : kb .comp_subtype ,
426+ "experience_active" : kb .experience_active ,
427+ }
405428 knowledgebases_data .append (kb_json )
406429 json_str = json .dumps (knowledgebases_data , indent = 2 , ensure_ascii = False )
407430 with open (KNOWLEDGEBASE_FILE , "w" , encoding = "utf-8" ) as f :
@@ -412,6 +435,8 @@ async def save_knowledge_to_file():
412435
413436all_pipeline_milvus_maps = {"change_pl" : []}
414437current_pipeline_kb_map = {}
438+
439+
415440async def refresh_milvus_map (milvus_name ):
416441 current_pipeline_kb_map .clear ()
417442 knowledge_bases_list = await get_all_knowledge_bases ()
@@ -420,13 +445,13 @@ async def refresh_milvus_map(milvus_name):
420445 continue
421446 current_pipeline_kb_map [kb .name ] = kb .file_map
422447 all_pipeline_milvus_maps [milvus_name ] = copy .deepcopy (current_pipeline_kb_map )
423- milvus_maps_path = os .path .join (CONFIG_DIR ,"milvus_maps.json" )
448+ milvus_maps_path = os .path .join (CONFIG_DIR , "milvus_maps.json" )
424449 with open (milvus_maps_path , "w" , encoding = "utf-8" ) as f :
425450 json .dump (all_pipeline_milvus_maps , f , ensure_ascii = False , indent = 2 )
426451
427452
428453def read_milvus_maps ():
429- milvus_maps_path = os .path .join (CONFIG_DIR ,"milvus_maps.json" )
454+ milvus_maps_path = os .path .join (CONFIG_DIR , "milvus_maps.json" )
430455 global all_pipeline_milvus_maps
431456 try :
432457 with open (milvus_maps_path , "r" , encoding = "utf-8" ) as f :
@@ -435,38 +460,45 @@ def read_milvus_maps():
435460 all_pipeline_milvus_maps = {"change_pl" : []}
436461 return all_pipeline_milvus_maps
437462
463+
438464def save_change_pl (pl_name ):
439- if pl_name not in all_pipeline_milvus_maps ["change_pl" ]:
465+ if pl_name not in all_pipeline_milvus_maps ["change_pl" ]:
440466 return all_pipeline_milvus_maps ["change_pl" ].append (pl_name )
441467
468+
442469async def Synchronizing_vector_data (old_active_pl , new_active_pl , pl_change ):
443470 try :
444471 if pl_change :
445472 save_change_pl (new_active_pl .name )
446473 active_kb = ctx .knowledgemgr .get_active_knowledge_base ()
447- # Determine whether it is kbadmin type
474+ # Determine whether it is kbadmin type
448475 if old_active_pl :
449- if old_active_pl .retriever .comp_subtype == "kbadmin_retriever" and new_active_pl .retriever .comp_subtype == "kbadmin_retriever" :
476+ if (
477+ old_active_pl .retriever .comp_subtype == "kbadmin_retriever"
478+ and new_active_pl .retriever .comp_subtype == "kbadmin_retriever"
479+ ):
450480 if active_kb :
451481 if active_kb .comp_subtype == "kbadmin_kb" :
452482 new_active_pl .retriever .config_kbadmin_milvus (active_kb .name )
453483 return True
454484 elif old_active_pl .retriever .comp_subtype == "kbadmin_retriever" :
455485 return True
456-
457- milvus_name = (old_active_pl .name + str (old_active_pl .indexer .model_extra ["d" ]) if old_active_pl else "default_kb" )
486+
487+ milvus_name = (
488+ old_active_pl .name + str (old_active_pl .indexer .model_extra ["d" ]) if old_active_pl else "default_kb"
489+ )
458490 if not new_active_pl .status .active :
459491 if old_active_pl :
460492 if old_active_pl .indexer .comp_subtype == "milvus_vector" :
461493 await refresh_milvus_map (milvus_name )
462494 return True
463495 if not active_kb :
464496 return True
465- if new_active_pl .retriever .comp_subtype == "kbadmin_retriever" :
466- if active_kb :
467- if active_kb .comp_subtype == "kbadmin_kb" :
468- new_active_pl .retriever .config_kbadmin_milvus (active_kb .name )
469- return True
497+ if new_active_pl .retriever .comp_subtype == "kbadmin_retriever" :
498+ if active_kb :
499+ if active_kb .comp_subtype == "kbadmin_kb" :
500+ new_active_pl .retriever .config_kbadmin_milvus (active_kb .name )
501+ return True
470502 # Perform milvus data synchronization
471503 if new_active_pl .indexer .comp_subtype == "milvus_vector" :
472504 # Pipeline component state changed
0 commit comments