-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_local_directory_import.py
More file actions
737 lines (599 loc) · 28.3 KB
/
test_local_directory_import.py
File metadata and controls
737 lines (599 loc) · 28.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
"""Unit tests for the local_directory_import Tool plugin."""
import json
import pathlib
import stat
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers to import the module under test without needing a live DB/config
# ---------------------------------------------------------------------------
def _make_import_module():
"""Import the plugin module with open_webui.* dependencies mocked."""
import sys
mocks = {
'fastapi': MagicMock(),
'pydantic': MagicMock(BaseModel=object),
'sqlalchemy': MagicMock(),
'open_webui': MagicMock(),
'open_webui.config': MagicMock(UPLOAD_DIR='/tmp/uploads'),
'open_webui.internal': MagicMock(),
'open_webui.internal.db': MagicMock(),
'open_webui.models': MagicMock(),
'open_webui.models.files': MagicMock(),
'open_webui.models.knowledge': MagicMock(),
'open_webui.models.users': MagicMock(),
'open_webui.routers': MagicMock(),
'open_webui.routers.retrieval': MagicMock(),
}
# Install mocks only for modules not yet in sys.modules
installed = {}
for key, mock in mocks.items():
if key not in sys.modules:
sys.modules[key] = mock
installed[key] = mock
# Force a fresh import of the plugin
plugin_key = 'local_directory_import'
if plugin_key in sys.modules:
del sys.modules[plugin_key]
import local_directory_import as plugin # noqa: E402
return plugin, installed
# Load the module once for the entire test session (always via mock shim,
# since fastapi/sqlalchemy/open_webui are not installed in the test environment)
_plugin_module, _ = _make_import_module()
# Re-export symbols for brevity
_discover_subfolders = _plugin_module._discover_subfolders
_discover_files = _plugin_module._discover_files
_find_file_by_hash = _plugin_module._find_file_by_hash
_find_or_create_kb = _plugin_module._find_or_create_kb
ImportSummary = _plugin_module.ImportSummary
KBImportSummary = _plugin_module.KBImportSummary
ImportFileResult = _plugin_module.ImportFileResult
Tools = _plugin_module.Tools
_vectorize_file = _plugin_module._vectorize_file
_INLINE_CONTENT_EXTENSIONS = _plugin_module._INLINE_CONTENT_EXTENSIONS
# ---------------------------------------------------------------------------
# T012 — _discover_subfolders & _discover_files
# ---------------------------------------------------------------------------
class TestDiscoverSubfolders:
def test_returns_immediate_subdirs(self, tmp_path):
"""(a) Drop folder with two subfolders returns both."""
(tmp_path / 'alpha').mkdir()
(tmp_path / 'beta').mkdir()
result = _discover_subfolders(tmp_path)
assert [p.name for p in result] == ['alpha', 'beta']
def test_ignores_files_at_root(self, tmp_path):
"""(b) Drop folder with files at root but no subfolders returns empty list."""
(tmp_path / 'readme.txt').write_text('hello')
result = _discover_subfolders(tmp_path)
assert result == []
def test_empty_drop_folder(self, tmp_path):
"""(c) Empty drop folder returns empty list."""
assert _discover_subfolders(tmp_path) == []
def test_hidden_subdirs_excluded(self, tmp_path):
"""(d) Dot-prefixed directories are excluded except .attachments."""
(tmp_path / 'visible').mkdir()
(tmp_path / '.git').mkdir()
(tmp_path / '.hidden').mkdir()
result = _discover_subfolders(tmp_path)
assert [p.name for p in result] == ['visible']
def test_attachments_subfolder_included(self, tmp_path):
"""(e) .attachments directory is NOT excluded — it is a valid content folder."""
(tmp_path / 'docs').mkdir()
(tmp_path / '.attachments').mkdir()
result = _discover_subfolders(tmp_path)
assert sorted(p.name for p in result) == ['.attachments', 'docs']
class TestDiscoverFiles:
def test_flat_subfolder(self, tmp_path):
"""(d) Flat subfolder returns only allowed doc files, including png and svg."""
(tmp_path / 'a.txt').write_text('a')
(tmp_path / 'b.md').write_text('b')
(tmp_path / 'c.json').write_text('{"k": 1}')
(tmp_path / 'e.pdf').write_bytes(b'%PDF-1.4')
(tmp_path / 'd.png').write_bytes(b'\x89PNG')
(tmp_path / 'f.svg').write_text('<svg/>')
(tmp_path / 'g.bin').write_bytes(b'bin')
(tmp_path / 'h.jpg').write_bytes(b'jpg')
(tmp_path / 'i.xml').write_text('<xml/>')
result = _discover_files(tmp_path)
assert sorted(p.name for p in result) == ['a.txt', 'b.md', 'c.json', 'd.png', 'e.pdf', 'f.svg', 'h.jpg', 'i.xml']
def test_nested_subdirectories(self, tmp_path):
"""(e) Nested subdirectories returns allowed files recursively."""
sub = tmp_path / 'sub'
sub.mkdir()
(tmp_path / 'root.yaml').write_text('a: 1')
(sub / 'nested.md').write_text('n')
(sub / 'doc.pdf').write_bytes(b'%PDF-1.4')
(sub / 'ignored.bin').write_bytes(b'bin')
(sub / 'ignored.jpg').write_bytes(b'jpg')
result = _discover_files(tmp_path)
names = sorted(p.name for p in result)
assert 'root.yaml' in names
assert 'nested.md' in names
assert 'doc.pdf' in names
assert 'ignored.bin' not in names
def test_empty_subfolder(self, tmp_path):
"""(f) Empty subfolder returns empty list."""
assert _discover_files(tmp_path) == []
def test_hidden_directory_files_excluded(self, tmp_path):
"""(g) Files inside hidden directories (e.g. .git) are not discovered."""
git_dir = tmp_path / '.git'
git_dir.mkdir()
(git_dir / 'config').write_text('[core]')
(git_dir / 'COMMIT_EDITMSG').write_text('init')
(tmp_path / 'readme.md').write_text('# hi')
result = _discover_files(tmp_path)
names = [p.name for p in result]
assert names == ['readme.md']
def test_nested_hidden_directory_files_excluded(self, tmp_path):
"""(h) Files nested inside any dot-prefixed dir (except .attachments) are excluded."""
hidden = tmp_path / '.hidden' / 'sub'
hidden.mkdir(parents=True)
(hidden / 'notes.txt').write_text('secret')
(tmp_path / 'visible.txt').write_text('visible')
result = _discover_files(tmp_path)
assert [p.name for p in result] == ['visible.txt']
def test_attachments_dir_files_included(self, tmp_path):
"""(i) Files inside .attachments are discovered (not a blocklisted dir)."""
attachments = tmp_path / '.attachments'
attachments.mkdir()
(attachments / 'diagram.png').write_bytes(b'\x89PNG')
(attachments / 'chart.svg').write_text('<svg/>')
(tmp_path / 'readme.md').write_text('# hi')
result = _discover_files(tmp_path)
names = sorted(p.name for p in result)
assert names == ['chart.svg', 'diagram.png', 'readme.md']
# ---------------------------------------------------------------------------
# T013 — _find_or_create_kb
# ---------------------------------------------------------------------------
class TestFindOrCreateKb:
@pytest.mark.asyncio
async def test_existing_kb_returns_id_false(self):
"""(e) Existing KB by name returns (id, False)."""
fake_kb = MagicMock()
fake_kb.id = 'existing-id'
mock_result = MagicMock()
mock_result.scalars.return_value.first.return_value = fake_kb
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
result = await _find_or_create_kb('my-kb', 'user-1', mock_db)
assert result == ('existing-id', False)
@pytest.mark.asyncio
async def test_missing_kb_creates_and_returns_true(self):
"""(f) No matching KB creates new KB and returns (new_id, True)."""
mock_result = MagicMock()
mock_result.scalars.return_value.first.return_value = None
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
new_kb = MagicMock()
new_kb.id = 'new-id'
with patch.object(_plugin_module.Knowledges, 'insert_new_knowledge', new=AsyncMock(return_value=new_kb)):
result = await _find_or_create_kb('new-kb', 'user-1', mock_db)
assert result == ('new-id', True)
@pytest.mark.asyncio
async def test_orm_lookup_failure_falls_back_to_knowledge_list(self):
"""ORM lookup failures fall back to Knowledges.get_knowledge_bases."""
fake_kb = MagicMock()
fake_kb.id = 'fallback-id'
fake_kb.name = 'my-kb'
mock_db = AsyncMock()
mock_db.execute = AsyncMock(side_effect=RuntimeError('orm broke'))
with patch.object(
_plugin_module.Knowledges,
'get_knowledge_bases',
new=AsyncMock(return_value=[fake_kb]),
):
result = await _find_or_create_kb('my-kb', 'user-1', mock_db)
assert result == ('fallback-id', False)
@pytest.mark.asyncio
async def test_sync_db_execute_result_is_supported(self):
"""Synchronous SQLAlchemy execute results are accepted."""
fake_kb = MagicMock()
fake_kb.id = 'existing-id'
mock_result = MagicMock()
mock_result.scalars.return_value.first.return_value = fake_kb
mock_db = MagicMock()
mock_db.execute = MagicMock(return_value=mock_result)
result = await _find_or_create_kb('my-kb', 'user-1', mock_db)
assert result == ('existing-id', False)
# ---------------------------------------------------------------------------
# _vectorize_file — inline content for YAML/JSON
# ---------------------------------------------------------------------------
class TestVectorizeFileInlineContent:
@pytest.mark.asyncio
async def test_yaml_content_passed_inline(self, tmp_path):
"""ProcessFileForm receives 'content' for .yml files."""
yaml_file = tmp_path / 'toc.yml'
yaml_file.write_text('- name: Learn\n href: /')
captured = {}
def fake_process_form(**kwargs):
captured.update(kwargs)
return MagicMock()
mock_form_cls = MagicMock(side_effect=fake_process_form)
with patch.object(_plugin_module, 'ProcessFileForm', mock_form_cls), \
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)):
await _vectorize_file(
MagicMock(), 'fid', 'kid', MagicMock(), MagicMock(),
file_path=yaml_file,
)
assert 'content' in captured
assert '- name: Learn' in captured['content']
@pytest.mark.asyncio
async def test_json_content_passed_inline(self, tmp_path):
"""ProcessFileForm receives 'content' for .json files."""
json_file = tmp_path / 'config.json'
json_file.write_text('{"key": "value"}')
captured = {}
def fake_process_form(**kwargs):
captured.update(kwargs)
return MagicMock()
mock_form_cls = MagicMock(side_effect=fake_process_form)
with patch.object(_plugin_module, 'ProcessFileForm', mock_form_cls), \
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)):
await _vectorize_file(
MagicMock(), 'fid', 'kid', MagicMock(), MagicMock(),
file_path=json_file,
)
assert captured.get('content') == '{"key": "value"}'
@pytest.mark.asyncio
async def test_md_file_no_inline_content(self, tmp_path):
"""ProcessFileForm is not given 'content' for .md files (pipeline handles it)."""
md_file = tmp_path / 'readme.md'
md_file.write_text('# Hello')
captured = {}
def fake_process_form(**kwargs):
captured.update(kwargs)
return MagicMock()
mock_form_cls = MagicMock(side_effect=fake_process_form)
with patch.object(_plugin_module, 'ProcessFileForm', mock_form_cls), \
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)):
await _vectorize_file(
MagicMock(), 'fid', 'kid', MagicMock(), MagicMock(),
file_path=md_file,
)
assert 'content' not in captured
def test_inline_content_extensions_set(self):
"""The inline-content extension set includes json, yml, yaml."""
assert _INLINE_CONTENT_EXTENSIONS == {'.json', '.yml', '.yaml'}
class TestFindFileByHash:
@pytest.mark.asyncio
async def test_sync_db_execute_result_is_supported(self):
"""Hash lookup works when db.execute returns a sync result object."""
fake_file = MagicMock()
fake_file.id = 'file-id'
mock_result = MagicMock()
mock_result.scalars.return_value.first.return_value = fake_file
mock_db = MagicMock()
mock_db.execute = MagicMock(return_value=mock_result)
result = await _find_file_by_hash('abc123', mock_db)
assert result is fake_file
# ---------------------------------------------------------------------------
# T014 — import_local_directory happy path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_import_local_directory_happy_path(tmp_path):
"""
Integration-style unit test: two subfolders with nested files.
First subfolder maps to an existing KB; second to a new KB.
"""
# Build temp directory tree
pp = tmp_path / 'power-platform'
pp.mkdir()
(pp / 'readme.md').write_text('pp readme')
(pp / 'docs').mkdir()
(pp / 'docs' / 'guide.md').write_text('pp guide')
af = tmp_path / 'azure-functions'
af.mkdir()
(af / 'main.yml').write_text('name: azure-functions')
admin_user = {
'id': 'admin-1',
'email': 'admin@example.com',
'name': 'Admin',
'role': 'admin',
}
mock_request = MagicMock()
# Mock Knowledges.add_file_to_knowledge_by_id
with (
patch.object(
_plugin_module.Knowledges,
'add_file_to_knowledge_by_id',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(
_plugin_module.Files,
'insert_new_file',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)),
patch('shutil.copy'),
patch.object(_plugin_module, '_find_or_create_kb') as mock_fock,
patch.object(_plugin_module, '_find_file_by_hash', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, 'get_async_db'),
):
# First subfolder (azure-functions) → existing KB, second (power-platform) → new KB
mock_fock.side_effect = [
('kb-az', False), # azure-functions (sorted first)
('kb-pp', True), # power-platform
]
# Patch get_async_db to be an async context manager
async_ctx = MagicMock()
async_ctx.__aenter__ = AsyncMock(return_value=MagicMock())
async_ctx.__aexit__ = AsyncMock(return_value=False)
_plugin_module.get_async_db.return_value = async_ctx
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(
admin_user, mock_request
)
data = json.loads(result_str)
assert 'error' not in data or data['error'] is None
assert data['total_discovered'] == 3 # 2 + 1 supported files
assert len(data['knowledge_bases']) == 2
assert data['duration_seconds'] >= 0
assert data['files_per_second'] >= 0
# kb_created flags
kb_map = {kb['kb_name']: kb for kb in data['knowledge_bases']}
assert kb_map['azure-functions']['kb_created'] is False
assert kb_map['power-platform']['kb_created'] is True
assert kb_map['azure-functions']['files'][0]['relative_path'] == 'main.yml'
assert kb_map['azure-functions']['duration_seconds'] >= 0
assert kb_map['azure-functions']['files_per_second'] >= 0
# relative_path is relative to subfolder root
pp_files = kb_map['power-platform']['files']
pp_rel_paths = {f['relative_path'] for f in pp_files}
assert 'readme.md' in pp_rel_paths
# docs/guide.md uses the platform path separator
assert any('guide.md' in rp for rp in pp_rel_paths)
# ---------------------------------------------------------------------------
# T016 — Access control
# ---------------------------------------------------------------------------
class TestAccessControl:
@pytest.mark.asyncio
async def test_non_admin_returns_error_summary(self):
"""(a) Non-admin role returns error summary with all counts 0."""
tools = Tools()
tools.valves.drop_folder = '/tmp'
user = {'id': 'u1', 'role': 'user', 'email': 'u@x.com', 'name': 'U'}
with patch('shutil.copy') as mock_copy:
result_str = await tools.import_local_directory(
user, MagicMock()
)
mock_copy.assert_not_called()
data = json.loads(result_str)
assert data['error'] == 'Access denied: admin role required'
assert data['total_discovered'] == 0
assert data['total_imported'] == 0
@pytest.mark.asyncio
async def test_blank_drop_folder_returns_error(self):
"""(b) Blank drop_folder valve returns error summary with all counts 0."""
tools = Tools()
tools.valves.drop_folder = ''
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
with patch('shutil.copy') as mock_copy:
result_str = await tools.import_local_directory(
admin_user, MagicMock()
)
mock_copy.assert_not_called()
data = json.loads(result_str)
assert data['error'] is not None
assert data['total_discovered'] == 0
@pytest.mark.asyncio
async def test_nonexistent_drop_folder_returns_error(self, tmp_path):
"""(c) Non-existent drop folder (via valve) returns error before file discovery."""
tools = Tools()
nonexistent = str(tmp_path / 'does_not_exist')
tools.valves.drop_folder = nonexistent
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
with patch.object(_plugin_module, '_discover_subfolders') as mock_disc:
result_str = await tools.import_local_directory(
admin_user, MagicMock()
)
mock_disc.assert_not_called()
data = json.loads(result_str)
assert data['error'] is not None
assert data['total_discovered'] == 0
# ---------------------------------------------------------------------------
# T020 — Vectorization
# ---------------------------------------------------------------------------
class TestVectorization:
@pytest.mark.asyncio
async def test_successful_vectorization_increments_processed(self, tmp_path):
"""(a) Successful vectorization increments processed count."""
sub = tmp_path / 'kb1'
sub.mkdir()
(sub / 'file.md').write_text('content')
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
with (
patch.object(
_plugin_module.Files,
'insert_new_file',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(
_plugin_module.Knowledges,
'add_file_to_knowledge_by_id',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)) as mock_proc,
patch('shutil.copy'),
patch.object(_plugin_module, '_insert_file_record', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, '_find_or_create_kb', new=AsyncMock(return_value=('kb-id', False))),
patch.object(_plugin_module, '_find_file_by_hash', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, 'get_async_db'),
):
async_ctx = MagicMock()
async_ctx.__aenter__ = AsyncMock(return_value=MagicMock())
async_ctx.__aexit__ = AsyncMock(return_value=False)
_plugin_module.get_async_db.return_value = async_ctx
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(admin_user, MagicMock())
data = json.loads(result_str)
assert data['total_processed'] == 1
assert data['total_failed'] == 0
mock_proc.assert_awaited_once()
@pytest.mark.asyncio
async def test_vectorization_failure_marks_file_and_retains_record(self, tmp_path):
"""(b) Vectorization exception: status=vectorization_failed, file record kept, failed incremented."""
sub = tmp_path / 'kb1'
sub.mkdir()
(sub / 'file.md').write_text('content')
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
mock_insert_record = AsyncMock(return_value=None)
with (
patch.object(_plugin_module.Files, 'insert_new_file', new=AsyncMock(return_value=MagicMock())),
patch.object(
_plugin_module.Knowledges,
'add_file_to_knowledge_by_id',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(
_plugin_module,
'process_file',
new=AsyncMock(side_effect=RuntimeError('embedding failed')),
),
patch('shutil.copy'),
patch.object(_plugin_module, '_insert_file_record', new=mock_insert_record),
patch.object(_plugin_module, '_find_or_create_kb', new=AsyncMock(return_value=('kb-id', False))),
patch.object(_plugin_module, '_find_file_by_hash', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, 'get_async_db'),
):
async_ctx = MagicMock()
async_ctx.__aenter__ = AsyncMock(return_value=MagicMock())
async_ctx.__aexit__ = AsyncMock(return_value=False)
_plugin_module.get_async_db.return_value = async_ctx
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(admin_user, MagicMock())
data = json.loads(result_str)
kb = data['knowledge_bases'][0]
assert kb['failed'] == 1
assert kb['processed'] == 0
assert kb['files'][0]['status'] == 'vectorization_failed'
# File record was created (_insert_file_record was called)
mock_insert_record.assert_awaited_once()
@pytest.mark.asyncio
async def test_sync_insert_new_file_return_is_supported(self, tmp_path):
"""Sync Files.insert_new_file return values should not break insert."""
dest_path = tmp_path / 'file.md'
dest_path.write_text('content')
files_stub = SimpleNamespace(insert_new_file=MagicMock(return_value=MagicMock()))
with (
patch.object(_plugin_module, 'Files', new=files_stub),
patch.object(
_plugin_module,
'FileForm',
new=lambda **kwargs: SimpleNamespace(**kwargs),
),
):
await _plugin_module._insert_file_record(
user_id='u1',
file_id='f1',
filename='file.md',
dest_path=dest_path,
relative_path='file.md',
file_hash='abc123',
)
files_stub.insert_new_file.assert_called_once()
@pytest.mark.asyncio
async def test_import_local_directory_supports_generator_db_dependency(tmp_path):
"""Generator-based DB dependency is accepted for Open WebUI compatibility."""
sub = tmp_path / 'kb1'
sub.mkdir()
(sub / 'file.md').write_text('content')
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
fake_db = MagicMock()
def db_generator():
yield fake_db
with (
patch.object(
_plugin_module.Files,
'insert_new_file',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(
_plugin_module.Knowledges,
'add_file_to_knowledge_by_id',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(_plugin_module, 'process_file', new=AsyncMock(return_value=None)),
patch('shutil.copy'),
patch.object(_plugin_module, '_insert_file_record', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, '_find_or_create_kb', new=AsyncMock(return_value=('kb-id', False))) as mock_find_kb,
patch.object(_plugin_module, '_find_file_by_hash', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, 'get_async_db', new=MagicMock(return_value=db_generator())),
):
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(admin_user, MagicMock())
data = json.loads(result_str)
assert data['error'] is None
assert data['total_processed'] == 1
mock_find_kb.assert_awaited_once_with('kb1', 'a1', fake_db)
@pytest.mark.asyncio
async def test_import_local_directory_surfaces_kb_creation_errors(tmp_path):
"""KB creation errors are reported in the JSON summary instead of hidden."""
sub = tmp_path / 'kb1'
sub.mkdir()
(sub / 'file.md').write_text('content')
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
with (
patch.object(
_plugin_module,
'_find_or_create_kb',
new=AsyncMock(side_effect=RuntimeError('knowledge create failed: boom')),
),
patch.object(_plugin_module, 'get_async_db'),
):
async_ctx = MagicMock()
async_ctx.__aenter__ = AsyncMock(return_value=MagicMock())
async_ctx.__aexit__ = AsyncMock(return_value=False)
_plugin_module.get_async_db.return_value = async_ctx
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(admin_user, MagicMock())
data = json.loads(result_str)
assert data['error'] == 'One or more knowledge bases failed to import; see knowledge_bases[*].error'
assert data['total_failed'] == 1
assert data['knowledge_bases'][0]['error'] == 'knowledge create failed: boom'
assert data['duration_seconds'] >= 0
assert data['files_per_second'] >= 0
@pytest.mark.asyncio
async def test_mixed_vectorization_results_accurate_counts(self, tmp_path):
"""(c) Mixed success/failure across files: summary counts accurate."""
sub = tmp_path / 'kb1'
sub.mkdir()
(sub / 'good.md').write_text('good')
(sub / 'bad.md').write_text('bad')
admin_user = {'id': 'a1', 'role': 'admin', 'email': 'a@x.com', 'name': 'A'}
call_count = {'n': 0}
async def alternating_process(*args, **kwargs):
call_count['n'] += 1
if call_count['n'] % 2 == 0:
raise RuntimeError('fail')
with (
patch.object(_plugin_module.Files, 'insert_new_file', new=AsyncMock(return_value=MagicMock())),
patch.object(
_plugin_module.Knowledges,
'add_file_to_knowledge_by_id',
new=AsyncMock(return_value=MagicMock()),
),
patch.object(_plugin_module, 'process_file', new=alternating_process),
patch('shutil.copy'),
patch.object(_plugin_module, '_insert_file_record', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, '_find_or_create_kb', new=AsyncMock(return_value=('kb-id', False))),
patch.object(_plugin_module, '_find_file_by_hash', new=AsyncMock(return_value=None)),
patch.object(_plugin_module, 'get_async_db'),
):
async_ctx = MagicMock()
async_ctx.__aenter__ = AsyncMock(return_value=MagicMock())
async_ctx.__aexit__ = AsyncMock(return_value=False)
_plugin_module.get_async_db.return_value = async_ctx
tools = Tools()
tools.valves.drop_folder = str(tmp_path)
result_str = await tools.import_local_directory(admin_user, MagicMock())
data = json.loads(result_str)
assert data['total_processed'] == 1
assert data['total_failed'] == 1
assert data['total_imported'] == 2
assert data['total_linked'] == 2