-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcolored_comments.py
More file actions
1121 lines (890 loc) · 44.4 KB
/
colored_comments.py
File metadata and controls
1121 lines (890 loc) · 44.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sublime
import sublime_plugin
import asyncio
from pathlib import Path
from typing import Optional, Dict, List, Set
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import threading
import time
import sublime_aio
from .plugin import logger as log
from sublime_lib import ResourcePath
from .plugin.settings import load_settings, settings, unload_settings
from .templates import SCHEME_TEMPLATE
NAME = "Colored Comments"
VERSION = "4.0.1"
comment_selector = "comment - punctuation.definition.comment"
KIND_SCHEME = (sublime.KIND_ID_VARIABLE, "s", "Scheme")
DEFAULT_CS = 'Packages/Color Scheme - Default/Mariana.sublime-color-scheme'
@dataclass
class TagResult:
"""Data class for tag scan results."""
tag: str
line: str
line_num: int
file: str
relative_path: str = field(init=False)
def __post_init__(self):
try:
# Try to get relative path from first project folder
folders = sublime.active_window().folders() if sublime.active_window() else []
if folders:
self.relative_path = str(Path(self.file).relative_to(Path(folders[0])))
else:
self.relative_path = Path(self.file).name
except (ValueError, AttributeError):
self.relative_path = Path(self.file).name
class BaseCommentProcessor:
"""Base class for comment processing functionality."""
def __init__(self, view: sublime.View):
self.view = view
def should_process_view(self) -> bool:
"""Check if view should be processed based on syntax settings."""
syntax = self.view.settings().get("syntax")
should_process = syntax not in settings.disabled_syntax
log.debug(f"View {self.view.id()} syntax check: {syntax}, should_process: {should_process}")
return should_process
def find_comment_regions(self) -> List[sublime.Region]:
"""Find all comment regions in the view."""
regions = self.view.find_by_selector(comment_selector)
log.debug(f"View {self.view.id()} found {len(regions)} comment regions")
return regions
class CommentDecorationManager(BaseCommentProcessor):
"""Manages comment decorations with optimized processing."""
def __init__(self, view: sublime.View):
super().__init__(view)
self._last_change_count = 0
self._last_region_row = -1
self._processing = False
log.debug(f"CommentDecorationManager created for view {view.id()}")
def needs_update(self) -> bool:
"""Check if view needs update and update change count."""
current_change = self.view.change_count()
needs_update = current_change != self._last_change_count
self._last_change_count = current_change
log.debug(f"View {self.view.id()} needs update: {needs_update}")
return needs_update
async def process_comment_line(self, line: str, reg: sublime.Region, line_num: int,
to_decorate: Dict[str, List[sublime.Region]],
prev_match: str = "") -> Optional[str]:
"""Process a single comment line for decoration."""
if not (stripped_line := line.strip()):
return None
if not settings.get_matching_pattern().startswith(" "):
line = stripped_line
# Check adjacency for continuation
current_row = line_num - 1
is_adjacent = (self._last_region_row != -1 and
current_row == self._last_region_row + 1)
# Try tag patterns first
for identifier, regex in settings.tag_regex.items():
if regex.search(line.strip()):
to_decorate.setdefault(identifier, []).append(reg)
self._last_region_row = current_row
log.debug(f"Matched tag '{identifier}' at line: {stripped_line[:50]}...")
return identifier
# Check for continuation
if (prev_match and is_adjacent and
((settings.continued_matching and line.startswith(settings.get_matching_pattern())) or
settings.auto_continue_highlight)):
to_decorate.setdefault(prev_match, []).append(reg)
self._last_region_row = current_row
log.debug(f"Continued tag '{prev_match}' at line: {stripped_line[:50]}...")
return prev_match
return None
def apply_region_styles(self, to_decorate: Dict[str, List[sublime.Region]]):
"""Apply visual styles to decorated regions."""
total_regions = sum(len(regions) for regions in to_decorate.values())
log.debug(f"Applying styles to {total_regions} regions across {len(to_decorate)} tag types")
for identifier, regions in to_decorate.items():
if tag := settings.tags.get(identifier):
self.view.add_regions(
identifier.lower(),
regions,
settings.get_scope_for_region(identifier, tag),
icon=settings.get_icon(),
flags=settings.get_flags(tag)
)
def clear_decorations(self):
"""Clear all existing decorations."""
log.debug(f"Clearing decorations for view {self.view.id()}")
for key in settings.region_keys:
self.view.erase_regions(key)
async def apply_decorations(self):
"""Apply decorations asynchronously with batching."""
if self._processing or not self.should_process_view():
return
self._processing = True
try:
# Check if update is needed
needs_update = self.needs_update()
has_existing = any(len(self.view.get_regions(key)) > 0 for key in settings.region_keys)
if not needs_update and has_existing:
return
to_decorate: Dict[str, List[sublime.Region]] = {}
prev_match = ""
self._last_region_row = -1
# Process comment regions in batches
for region in self.find_comment_regions():
line = self.view.substr(region)
line_num = self.view.rowcol(region.begin())[0] + 1
if result := await self.process_comment_line(line, region, line_num, to_decorate, prev_match):
prev_match = result
self.clear_decorations()
self.apply_region_styles(to_decorate)
log.debug(f"Decoration process complete for view {self.view.id()}")
except Exception as e:
log.debug(f"Error in apply_decorations: {e}")
finally:
self._processing = False
class FileScanner:
"""Handles file scanning operations with optimized filtering."""
@classmethod
def should_skip_file(cls, file_path: Path) -> bool:
"""Check if file should be skipped."""
return (file_path.suffix.lower() in settings.skip_extensions or
any(part in settings.skip_dirs for part in file_path.parts))
@classmethod
async def get_project_files(cls, folders: List[str]) -> List[Path]:
"""Get all valid text files from project folders."""
files = []
for folder in folders:
folder_path = Path(folder)
try:
all_files = list(folder_path.rglob('*'))
valid_files = [
f for f in all_files
if f.is_file() and not cls.should_skip_file(f)
]
files.extend(valid_files)
except (OSError, PermissionError) as e:
log.debug(f"Error scanning folder {folder_path}: {e}")
return files
class AsyncTagScanner(BaseCommentProcessor):
"""Async tag scanner with optimized file processing."""
def __init__(self, window: sublime.Window):
self.window = window
self._temp_panel = None
log.debug(f"AsyncTagScanner created for window {window.id()}")
async def scan_for_tags(self, *, tag_filter: Optional[str] = None,
current_file_only: bool = False) -> List[TagResult]:
"""Scan for tags with optimized batch processing."""
files = await self._get_files_to_scan(current_file_only)
if not files:
return []
results = []
batch_size = 12 # Larger batch size since async sleeps were removed
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
# Create parallel tasks with unique panel names
batch_tasks = []
for j, file_path in enumerate(batch):
panel_name = f'_colored_comments_temp_view_{i}_{j}'
batch_tasks.append(self._scan_file_with_unique_panel(file_path, panel_name, tag_filter))
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
for result in batch_results:
if isinstance(result, list):
results.extend(result)
progress = min(100, int(((i + batch_size) / len(files)) * 100))
sublime.status_message(f"Scanning tags... {progress}% ({len(results)} found)")
return results
async def _get_files_to_scan(self, current_file_only: bool) -> List[Path]:
"""Get files to scan based on scope."""
if current_file_only:
if (view := self.window.active_view()) and (file_name := view.file_name()):
return [Path(file_name)]
return []
folders = self.window.folders()
return await FileScanner.get_project_files(folders) if folders else []
@asynccontextmanager
async def _get_view_for_file(self, file_path: Path, panel_name: str = '_colored_comments_temp_view'):
"""Context manager for getting a view for a file with unique panel name."""
# Check if already open
for view in self.window.views():
if view.file_name() == str(file_path):
yield view
return
# Create a fresh temp panel for this file
temp_panel = None
try:
# Read file asynchronously to avoid blocking UI
content = await self._read_file_async(file_path)
if content is None:
yield None
return
# Create fresh panel for this file with unique name
temp_panel = self.window.create_output_panel(panel_name)
temp_panel.run_command('append', {'characters': content})
if syntax := sublime.find_syntax_for_file(str(file_path)):
temp_panel.assign_syntax(syntax)
yield temp_panel
finally:
# Immediately destroy the panel after use
if temp_panel:
self.window.destroy_output_panel(panel_name)
async def _read_file_async(self, file_path: Path) -> Optional[str]:
"""Read file content asynchronously to avoid blocking UI."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return content
except Exception as e:
log.debug(f"Error reading file {file_path}: {e}")
return None
async def _scan_file(self, file_path: Path, tag_filter: Optional[str]) -> List[TagResult]:
"""Scan a single file for tags."""
return await self._scan_file_with_unique_panel(file_path, '_colored_comments_temp_view', tag_filter)
async def _scan_file_with_unique_panel(self, file_path: Path, panel_name: str, tag_filter: Optional[str]) -> List[TagResult]:
"""Scan a single file for tags using a unique panel name."""
results = []
try:
async with self._get_view_for_file(file_path, panel_name) as view:
if not view:
return results
# Use base class functionality
super().__init__(view)
if not self.should_process_view():
return results
for region in self.find_comment_regions():
for reg in view.split_by_newlines(region):
line = view.substr(reg)
line_num = view.rowcol(reg.begin())[0] + 1
if tag_results := await self._process_comment_line(line, line_num, file_path, tag_filter):
results.extend(tag_results)
except Exception as e:
log.debug(f"Error scanning file {file_path}: {e}")
return results
async def _process_comment_line(self, line: str, line_num: int, file_path: Path,
tag_filter: Optional[str]) -> List[TagResult]:
"""Process a comment line for tags."""
if not (stripped_line := line.strip()):
return []
if not settings.get_matching_pattern().startswith(" "):
line = stripped_line
results = []
for tag_name, regex in settings.tag_regex.items():
if tag_filter and tag_name.lower() != tag_filter.lower():
continue
if regex.search(line.strip()):
results.append(TagResult(
tag=tag_name,
line=stripped_line,
line_num=line_num,
file=str(file_path)
))
return results
class TagIndex:
"""Thread-safe tag index with optimized operations."""
def __init__(self):
self._file_index: Dict[str, List[TagResult]] = {}
self._tag_index: Dict[str, List[TagResult]] = {}
self._file_timestamps: Dict[str, float] = {}
self._indexed_folders: Set[str] = set()
self._lock = threading.RLock()
self._indexing = False
self._indexing_complete_event: Optional[asyncio.Event] = None # Lazy initialization
def _get_or_create_event(self) -> asyncio.Event:
"""Get or create the indexing complete event for the current loop."""
if self._indexing_complete_event is None:
self._indexing_complete_event = asyncio.Event()
return self._indexing_complete_event
def is_indexed(self, folders: List[str]) -> bool:
"""Check if folders are already indexed."""
with self._lock:
return all(folder in self._indexed_folders for folder in folders)
def is_indexing(self) -> bool:
"""Check if indexing is currently in progress."""
with self._lock:
return self._indexing
async def wait_for_indexing_complete(self) -> None:
"""Wait for indexing to complete using an event."""
if not self.is_indexing():
return # Already complete
# Get event for current loop
event = self._get_or_create_event()
await event.wait()
async def build_initial_index(self, window: sublime.Window) -> None:
"""Build initial tag index for the window's project folders."""
folders = window.folders()
if not folders:
log.debug("No project folders found for indexing")
return
with self._lock:
if self._indexing:
log.debug("Index building already in progress")
return
if self.is_indexed(folders):
log.debug("Folders already indexed")
return
self._indexing = True
# Create fresh event for current loop
self._indexing_complete_event = asyncio.Event()
try:
start_time = time.perf_counter()
log.debug(f"Starting tag index build for folders: {folders}")
# Get all files to index
files = await FileScanner.get_project_files(folders)
if not files:
log.debug("No files found to index")
return
scanner = AsyncTagScanner(window)
total_tags = 0
files_processed = 0
sublime.status_message(f"🔍 Building tag index... (0/{len(files)} files)")
# Index files in batches - each file gets its own temp panel
batch_size = 8
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
# Create tasks for parallel processing with unique panel names
tasks = []
for j, file_path in enumerate(batch):
panel_name = f'_colored_comments_temp_view_{i}_{j}'
task = asyncio.create_task(
self._index_file(file_path, scanner, force_update=False, panel_name=panel_name)
)
tasks.append(task)
# Wait for batch to complete
await asyncio.gather(*tasks, return_exceptions=True)
files_processed += len(batch)
current_tags = self.get_total_tag_count()
sublime.status_message(
f"🔍 Building tag index... "
f"({files_processed}/{len(files)} files, {current_tags} tags found)"
)
# Mark folders as indexed
with self._lock:
self._indexed_folders.update(folders)
elapsed = time.perf_counter() - start_time
total_tags = self.get_total_tag_count()
sublime.status_message(
f"✅ Tag index built: {total_tags} tags in {files_processed} files ({elapsed:.2f}s)"
)
sublime.set_timeout(lambda: sublime.status_message(""), 3000)
log.debug(f"Index build complete: {total_tags} tags, {elapsed:.2f}s")
except Exception as e:
log.debug(f"Error building tag index: {e}")
sublime.status_message(f"❌ Tag index build failed: {str(e)}")
sublime.set_timeout(lambda: sublime.status_message(""), 5000)
finally:
with self._lock:
self._indexing = False
# Signal completion on the event if it exists
if self._indexing_complete_event:
self._indexing_complete_event.set()
async def _index_file(self, file_path: Path, scanner: AsyncTagScanner, force_update: bool = False, panel_name: Optional[str] = None) -> None:
"""Index a single file."""
file_str = str(file_path)
current_time = time.time()
# Check if file needs indexing
if not force_update:
with self._lock:
if file_str in self._file_timestamps:
try:
file_mtime = file_path.stat().st_mtime
if file_mtime <= self._file_timestamps[file_str]:
return # File hasn't changed
except (OSError, AttributeError):
pass # File may not exist, proceed with indexing
try:
# Use unique panel name if provided
if panel_name:
results = await scanner._scan_file_with_unique_panel(file_path, panel_name, None)
else:
results = await scanner._scan_file(file_path, None)
# Update index
with self._lock:
# Remove old entries for this file
self._remove_file_from_internal_index(file_str)
# Add new results
if results:
self._file_index[file_str] = results
for result in results:
self._tag_index.setdefault(result.tag, []).append(result)
# Update timestamp
self._file_timestamps[file_str] = current_time
log.debug(f"Indexed {len(results)} tags from {file_path.name}")
except Exception as e:
log.debug(f"Error indexing file {file_path}: {e}")
def update_file_index(self, file_path: str, window: sublime.Window) -> None:
"""Update index for a specific file (called when file is modified)."""
if self._indexing:
return
# Run async update in background using sublime_aio
def on_update_done(future):
try:
future.result()
except Exception as e:
log.debug(f"Error in file index update callback: {e}")
sublime_aio.run_coroutine(self._update_file_async(file_path, window)).add_done_callback(on_update_done)
async def _update_file_async(self, file_path: str, window: sublime.Window) -> None:
"""Async helper for updating file index."""
try:
path = Path(file_path)
if not path.exists() or FileScanner.should_skip_file(path):
with self._lock:
if file_path in self._file_index:
self.remove_file_from_index(file_path)
sublime.status_message(f"🗑️ Removed {Path(file_path).name} from tag index")
sublime.set_timeout(lambda: sublime.status_message(""), 2000)
return
scanner = AsyncTagScanner(window)
old_count = len(self._file_index.get(file_path, []))
# Force update to ensure fresh line numbers
await self._index_file(path, scanner, force_update=True)
new_count = len(self._file_index.get(file_path, []))
# Show brief status update for significant changes
if new_count != old_count:
filename = Path(file_path).name
if new_count > old_count:
sublime.status_message(f"📝 Updated tag index: +{new_count - old_count} tags in {filename}")
elif old_count > 0:
sublime.status_message(f"📝 Updated tag index: -{old_count - new_count} tags in {filename}")
else:
sublime.status_message(f"📝 Updated tag index: {filename}")
sublime.set_timeout(lambda: sublime.status_message(""), 3000)
log.debug(f"Updated index for file: {file_path} ({old_count} -> {new_count} tags)")
except Exception as e:
log.debug(f"Error updating file index for {file_path}: {e}")
sublime.status_message(f"❌ Error updating tag index for {Path(file_path).name}")
sublime.set_timeout(lambda: sublime.status_message(""), 3000)
def remove_file_from_index(self, file_path: str) -> None:
"""Remove a file from the index (when deleted)."""
with self._lock:
self._remove_file_from_internal_index(file_path)
self._file_timestamps.pop(file_path, None)
def _remove_file_from_internal_index(self, file_path: str) -> None:
"""Internal method to remove file from index (assumes lock is held)."""
# Remove from main index
old_results = self._file_index.pop(file_path, [])
# Remove from tag index
for result in old_results:
if result.tag in self._tag_index:
self._tag_index[result.tag] = [
r for r in self._tag_index[result.tag]
if r.file != file_path
]
if not self._tag_index[result.tag]:
del self._tag_index[result.tag]
def get_all_tags(self, tag_filter: Optional[str] = None, current_file_only: bool = False,
current_file_path: Optional[str] = None) -> List[TagResult]:
"""Get all tags from the index."""
with self._lock:
results = []
if current_file_only and current_file_path:
# Get tags only from current file
results = self._file_index.get(current_file_path, [])
else:
# Get all tags
if tag_filter and tag_filter in self._tag_index:
results = self._tag_index[tag_filter][:]
else:
for file_results in self._file_index.values():
results.extend(file_results)
# Apply tag filter if specified and not using tag index
if tag_filter and not current_file_only:
results = [r for r in results if r.tag.lower() == tag_filter.lower()]
return results
def get_total_tag_count(self) -> int:
"""Get total number of tags in the index."""
with self._lock:
return sum(len(results) for results in self._file_index.values())
def clear_index(self) -> None:
"""Clear the entire index."""
with self._lock:
self._file_index.clear()
self._tag_index.clear()
self._file_timestamps.clear()
self._indexed_folders.clear()
# Reset the event to avoid loop attachment issues
self._indexing_complete_event = None
log.debug("Tag index cleared")
# Global tag index instance
tag_index = TagIndex()
class QuickPanelBuilder:
"""Builder for creating enhanced quick panel items."""
TAG_KINDS = {
'TODO': (sublime.KIND_ID_FUNCTION, "T", "Todo"),
'FIXME': (sublime.KIND_ID_VARIABLE, "F", "Fix Me"),
'Important': (sublime.KIND_ID_MARKUP, "!", "Important"),
'Question': (sublime.KIND_ID_NAMESPACE, "?", "Question"),
'Deprecated': (sublime.KIND_ID_TYPE, "D", "Deprecated"),
'UNDEFINED': (sublime.KIND_ID_SNIPPET, "U", "Undefined"),
}
@classmethod
def create_tag_panel_items(cls, results: List[TagResult]) -> List[sublime.QuickPanelItem]:
"""Create quick panel items for tag results."""
return [cls._create_tag_item(result) for result in results]
@classmethod
def _create_tag_item(cls, result: TagResult) -> sublime.QuickPanelItem:
"""Create a single quick panel item for a tag result."""
kind = cls.TAG_KINDS.get(result.tag, (sublime.KIND_ID_MARKUP, "C", "Comment"))
comment_text = result.line.strip()
if len(comment_text) > 120:
comment_text = comment_text[:117] + "..."
trigger = f"[{result.tag}] {comment_text}"
annotation = f"{result.relative_path}:{result.line_num}"
tag_emoji = settings.get_icon_emoji(result.tag)
file_icon = "📄" if result.file.endswith('.py') else "📝"
details = [
f"<div style='padding: 2px 0;'>"
f"<span style='color: var(--accent);'>{tag_emoji} {result.tag}</span> "
f"<span style='color: var(--foreground);'>in</span> "
f"<span style='color: var(--bluish);'>{file_icon} {result.relative_path}</span>"
f"</div>",
f"<div style='padding: 2px 0; font-size: 0.9em; color: var(--foreground);'>"
f"<span style='color: var(--accent);'>Line {result.line_num}:</span> "
f"<code style='background: var(--background); padding: 1px 3px; border-radius: 2px;'>"
f"{sublime.html.escape(comment_text)}"
f"</code>"
f"</div>"
]
return sublime.QuickPanelItem(
trigger=trigger,
details=details,
annotation=annotation,
kind=kind
)
class ViewportManager:
"""Manages viewport positions for preview functionality."""
def __init__(self, window: sublime.Window):
self.window = window
self.original_positions = {}
self._store_original_positions()
def _store_original_positions(self):
"""Store original viewport positions for all open views."""
for view in self.window.views():
if view.file_name():
self.original_positions[view.file_name()] = {
'viewport_position': view.viewport_position(),
'selection': [r for r in view.sel()],
'view_id': view.id()
}
def restore_original_positions(self):
"""Restore all views to their original positions."""
for file_path, pos_data in self.original_positions.items():
for view in self.window.views():
if (view.file_name() == file_path and
view.id() == pos_data['view_id']):
view.set_viewport_position(pos_data['viewport_position'], False)
view.sel().clear()
for region in pos_data['selection']:
view.sel().add(region)
break
def preview_location(self, result: TagResult):
"""Preview a location with enhanced status display."""
tag_emoji = settings.get_icon_emoji(result.tag)
preview_line = result.line.strip()[:100]
sublime.status_message(f"{tag_emoji} [{result.tag}] {preview_line}")
# Navigate to location
target_view = self._find_view_for_file(result.file)
if target_view:
self._navigate_existing_view(target_view, result.line_num)
else:
self._open_transient_preview(result)
def _find_view_for_file(self, file_path: str) -> Optional[sublime.View]:
"""Find existing view for a file."""
return next((v for v in self.window.views() if v.file_name() == file_path), None)
def _navigate_existing_view(self, view: sublime.View, line_num: int):
"""Navigate within an existing view."""
point = view.text_point(line_num - 1, 0)
view.sel().clear()
view.sel().add(point)
view.show_at_center(point)
self.window.focus_view(view)
def _open_transient_preview(self, result: TagResult):
"""Open file as transient preview."""
preview_view = self.window.open_file(
f"{result.file}:{result.line_num}",
sublime.ENCODED_POSITION | sublime.TRANSIENT
)
def center_preview():
if not preview_view.is_loading():
point = preview_view.text_point(result.line_num - 1, 0)
preview_view.show_at_center(point)
else:
sublime.set_timeout(center_preview, 10)
sublime.set_timeout(center_preview, 10)
class ColoredCommentsEditSchemeCommand(sublime_plugin.WindowCommand):
"""Command to edit color scheme with enhanced scheme selection."""
def run(self):
current_scheme = self._get_current_scheme()
schemes = self._get_available_schemes(current_scheme)
def on_done(i):
if i >= 0:
self._open_scheme(schemes[i][2])
self.window.show_quick_panel(schemes, on_done)
def _get_current_scheme(self) -> str:
"""Get current color scheme path."""
view = self.window.active_view()
scheme = (view.settings().get("color_scheme") if view else None) or \
sublime.load_settings("Preferences.sublime-settings").get("color_scheme")
if scheme and not scheme.startswith('Packages/'):
scheme = '/'.join(['Packages'] + scheme.split('/')[1:]) if '/' in scheme else scheme
return scheme or DEFAULT_CS
def _get_available_schemes(self, current_scheme: str) -> List[List[str]]:
"""Get list of available color schemes."""
schemes = [['Edit Current: ' + current_scheme.split('/')[-1], current_scheme, current_scheme]]
resources = sublime.find_resources("*.sublime-color-scheme") + sublime.find_resources("*.tmTheme")
schemes.extend([[r.split('/')[-1], r, r] for r in resources])
return schemes
def _open_scheme(self, scheme_path: str):
"""Open and potentially inject template into scheme."""
try:
resource = ResourcePath.from_file_path(scheme_path)
new_view = self.window.open_file(str(resource))
def check_loaded_and_inject():
if new_view.is_loading():
sublime.set_timeout(check_loaded_and_inject, 50)
else:
self._inject_scheme_template(new_view)
check_loaded_and_inject()
except Exception as e:
log.debug(f"Error opening scheme: {e}")
def _inject_scheme_template(self, view: sublime.View):
"""Inject scheme template if needed."""
content = view.substr(sublime.Region(0, view.size()))
if "comments.important" not in content:
insertion_point = view.size()
if content.strip().endswith('}'):
lines = content.split('\n')
for i in range(len(lines) - 1, -1, -1):
if '}' in lines[i]:
insertion_point = sum(len(line) + 1 for line in lines[:i])
break
view.run_command('insert', {
'characters': '\n' + SCHEME_TEMPLATE.rstrip() + '\n'
})
class ColoredCommentsEventListener(sublime_aio.ViewEventListener):
"""Optimized event listener using new structure."""
def __init__(self, view):
super().__init__(view)
self.manager = CommentDecorationManager(view)
@sublime_aio.debounced(settings.debounce_delay)
async def on_modified(self):
"""Handle view modifications."""
if self.view.settings().get("syntax") not in settings.disabled_syntax:
await self.manager.apply_decorations()
# Update tag index for this file if it has a file path
if self.view.file_name():
tag_index.update_file_index(self.view.file_name(), self.view.window())
async def on_load(self):
"""Handle view loading."""
if self.view.settings().get("syntax") not in settings.disabled_syntax:
await self.manager.apply_decorations()
# Check if we need to build index when loading files in a project
if self.view.file_name() and self.view.window() and self.view.window().folders():
if not tag_index.is_indexed(self.view.window().folders()):
def on_index_done(future):
try:
future.result()
except Exception as e:
log.debug(f"Error building index on file load: {e}")
sublime_aio.run_coroutine(
tag_index.build_initial_index(self.view.window())
).add_done_callback(on_index_done)
async def on_activated(self):
"""Handle view activation."""
if self.view.settings().get("syntax") not in settings.disabled_syntax:
await self.manager.apply_decorations()
def on_close(self):
"""Handle view closing."""
self.manager.clear_decorations()
class ColoredCommentsWindowEventListener(sublime_plugin.EventListener):
"""Window-level event listener for tag index management."""
def on_window_command(self, window, command_name, args):
"""Handle window commands that might affect project structure."""
if command_name in ['new_window', 'close_window', 'open_project', 'close_project']:
# Delay to let the window/project state settle
sublime.set_timeout(lambda: self._check_index_for_window(window), 200)
def on_load_project(self, window):
"""Handle project loading."""
sublime.set_timeout(lambda: self._check_index_for_window(window), 300)
def on_activated(self, view):
"""Handle view activation - check if we need to build index."""
if view and view.window() and view.window().folders():
window = view.window()
if not tag_index.is_indexed(window.folders()) and not tag_index.is_indexing():
def on_index_done(future):
try:
future.result()
except Exception as e:
log.debug(f"Error building index on view activation: {e}")
sublime_aio.run_coroutine(
tag_index.build_initial_index(window)
).add_done_callback(on_index_done)
def _check_index_for_window(self, window):
"""Check if window needs index building."""
if window and window.folders():
if not tag_index.is_indexed(window.folders()) and not tag_index.is_indexing():
def on_index_done(future):
try:
future.result()
except Exception as e:
log.debug(f"Error building index for window: {e}")
sublime_aio.run_coroutine(
tag_index.build_initial_index(window)
).add_done_callback(on_index_done)
class ColoredCommentsCommand(sublime_aio.ViewCommand):
"""Manual decoration command."""
async def run(self):
manager = CommentDecorationManager(self.view)
if not manager.should_process_view():
sublime.status_message("View type not supported for colored comments")
return
manager._last_change_count = 0 # Force update
await manager.apply_decorations()
sublime.status_message("Comment decorations applied")
class ColoredCommentsListTagsCommand(sublime_aio.WindowCommand):
"""Enhanced tag listing command with optimized processing."""
async def run(self, tag_filter=None, current_file_only=False):
if tag_filter and tag_filter not in settings.tag_regex:
available_tags = ", ".join(settings.tag_regex.keys())
sublime.error_message(f"Unknown tag filter: '{tag_filter}'\nAvailable tags: {available_tags}")
return
try:
# Check if we need to build initial index
if not tag_index.is_indexed(self.window.folders()) and not tag_index.is_indexing():
sublime.status_message("🔍 Tag index not found, building now...")
await tag_index.build_initial_index(self.window)
elif tag_index.is_indexing():
sublime.status_message("⏳ Waiting for tag index to complete...")
# Wait for indexing to complete
await tag_index.wait_for_indexing_complete()
# Get current file path for current_file_only mode
current_file_path = None
if current_file_only:
active_view = self.window.active_view()
if active_view and active_view.file_name():
current_file_path = active_view.file_name()
# Get results from index (very fast!)
scope_text = "current file" if current_file_only else "project"
filter_text = f" ({tag_filter} tags)" if tag_filter else ""
sublime.status_message(f"📋 Loading {scope_text} tags{filter_text}...")
results = tag_index.get_all_tags(
tag_filter=tag_filter,
current_file_only=current_file_only,
current_file_path=current_file_path
)
if results:
self._show_results(results, tag_filter, current_file_only)
scope_text = "current file" if current_file_only else "project"
filter_text = f" (filtered by '{tag_filter}')" if tag_filter else ""
sublime.status_message(f"Found {len(results)} comment tags in {scope_text}{filter_text}")
else:
scope_text = "current file" if current_file_only else "project"
filter_text = f" matching '{tag_filter}'" if tag_filter else ""
sublime.status_message(f"No comment tags found in {scope_text}{filter_text}")
except Exception as e:
log.debug(f"Error in tag listing: {e}")
sublime.error_message(f"Error listing tags: {str(e)}")
def _show_results(self, results: List[TagResult], tag_filter=None, current_file_only=False):
"""Show results using optimized components."""
results.sort(key=lambda x: (x.tag, x.relative_path, x.line_num))
viewport_manager = ViewportManager(self.window)
panel_items = QuickPanelBuilder.create_tag_panel_items(results)
scope_text = "Current File" if current_file_only else "Project"
filter_text = f" - {tag_filter} Tags" if tag_filter else " - All Tags"
header_text = f"{scope_text}{filter_text} ({len(results)} found)"
def on_done(index):
if index >= 0:
result = results[index]
self.window.open_file(f"{result.file}:{result.line_num}", sublime.ENCODED_POSITION)
else:
viewport_manager.restore_original_positions()
def on_highlight(index):
if index >= 0:
viewport_manager.preview_location(results[index])