Skip to content

Commit 13d1b26

Browse files
committed
pt 1 refactor
1 parent 2b8474c commit 13d1b26

13 files changed

Lines changed: 318 additions & 62 deletions

File tree

stringsight/api.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
import io
1818
import os
1919
import time
20+
from collections import OrderedDict
2021

2122
import pandas as pd
2223
from fastapi import FastAPI, UploadFile, File, HTTPException, Body, Query, Depends, BackgroundTasks
2324
from fastapi.middleware.cors import CORSMiddleware
2425
from fastapi.responses import StreamingResponse
26+
from fastapi.staticfiles import StaticFiles
2527
from pydantic import BaseModel
2628
from pathlib import Path
2729

@@ -52,11 +54,14 @@
5254
from stringsight.utils.paths import _get_persistent_data_dir, _get_results_dir, _get_cache_dir
5355

5456
# -------------------------------------------------------------------------
55-
# Simple in-memory cache for parsed JSONL data with TTL
57+
# Bounded LRU cache for parsed JSONL data with TTL
5658
# -------------------------------------------------------------------------
57-
_JSONL_CACHE: Dict[str, tuple[List[Dict[str, Any]], datetime]] = {}
59+
_MAX_CACHE_ENTRIES = 10 # Maximum number of files to cache
60+
_MAX_CACHE_SIZE_MB = 100 # Maximum total cache size in MB
5861
_CACHE_TTL = timedelta(minutes=15) # Cache for 15 minutes
62+
_JSONL_CACHE: OrderedDict[str, tuple[List[Dict[str, Any]], datetime, int]] = OrderedDict() # key -> (data, timestamp, size_bytes)
5963
_CACHE_LOCK = threading.Lock()
64+
_cache_stats = {"hits": 0, "misses": 0, "evictions": 0, "total_size_mb": 0.0}
6065

6166
def _get_file_hash(path: Path) -> str:
6267
"""Get a hash of file path and modification time for cache key."""
@@ -65,10 +70,15 @@ def _get_file_hash(path: Path) -> str:
6570
return hashlib.md5(key_str.encode()).hexdigest()
6671

6772
def _get_cached_jsonl(path: Path, nrows: int | None = None) -> List[Dict[str, Any]]:
68-
"""Read JSONL file with caching. Cache key includes file mtime to auto-invalidate on changes.
73+
"""Read JSONL file with bounded LRU caching. Cache key includes file mtime to auto-invalidate on changes.
6974
7075
Only caches full file reads (nrows=None) to avoid cache bloat. For partial reads,
7176
reads directly from disk.
77+
78+
Cache is bounded by:
79+
- Maximum 10 files
80+
- Maximum 100MB total size
81+
- 15-minute TTL
7282
"""
7383
# Only cache full file reads to avoid memory bloat
7484
if nrows is not None:
@@ -79,24 +89,46 @@ def _get_cached_jsonl(path: Path, nrows: int | None = None) -> List[Dict[str, An
7989

8090
with _CACHE_LOCK:
8191
if cache_key in _JSONL_CACHE:
82-
cached_data, cached_time = _JSONL_CACHE[cache_key]
92+
cached_data, cached_time, cached_size = _JSONL_CACHE[cache_key]
8393
# Check if cache is still valid
8494
if datetime.now() - cached_time < _CACHE_TTL:
95+
# Move to end (mark as recently used)
96+
_JSONL_CACHE.move_to_end(cache_key)
97+
_cache_stats["hits"] += 1
8598
logger.debug(f"Cache hit for {path.name}")
8699
return cached_data
87100
else:
88101
# Remove expired entry
89102
del _JSONL_CACHE[cache_key]
103+
_cache_stats["total_size_mb"] -= cached_size / (1024 * 1024)
90104
logger.debug(f"Cache expired for {path.name}")
91105

92106
# Cache miss - read from disk
93-
logger.debug(f"Cache miss for {path.name}, reading from disk")
107+
_cache_stats["misses"] += 1
108+
logger.debug(f"Cache miss for {path.name}, reading from disk (hits: {_cache_stats['hits']}, misses: {_cache_stats['misses']})")
94109
data = _read_jsonl_as_list(path, nrows)
95110

96111
# Store in cache (only if full file read)
97112
if nrows is None:
113+
import sys
114+
data_size = sys.getsizeof(data)
115+
data_size_mb = data_size / (1024 * 1024)
116+
98117
with _CACHE_LOCK:
99-
_JSONL_CACHE[cache_key] = (data, datetime.now())
118+
# Evict entries if needed to stay under limits
119+
while len(_JSONL_CACHE) >= _MAX_CACHE_ENTRIES or _cache_stats["total_size_mb"] + data_size_mb > _MAX_CACHE_SIZE_MB:
120+
if not _JSONL_CACHE:
121+
break
122+
# Remove oldest entry (FIFO eviction)
123+
evicted_key, (_, _, evicted_size) = _JSONL_CACHE.popitem(last=False)
124+
_cache_stats["total_size_mb"] -= evicted_size / (1024 * 1024)
125+
_cache_stats["evictions"] += 1
126+
logger.debug(f"Evicted cache entry (total evictions: {_cache_stats['evictions']})")
127+
128+
# Add new entry
129+
_JSONL_CACHE[cache_key] = (data, datetime.now(), data_size)
130+
_cache_stats["total_size_mb"] += data_size_mb
131+
logger.debug(f"Cached {path.name} ({data_size_mb:.2f}MB, total cache: {_cache_stats['total_size_mb']:.2f}MB)")
100132

101133
return data
102134

@@ -130,7 +162,7 @@ def _resolve_within_base(user_path: str) -> Path:
130162
target = (base / target).resolve() if not target.is_absolute() else target.resolve()
131163
try:
132164
target.relative_to(base)
133-
except Exception:
165+
except ValueError:
134166
raise HTTPException(status_code=400, detail="Path is outside the allowed base directory")
135167
if not target.exists():
136168
raise HTTPException(status_code=404, detail=f"Path not found: {target}")
@@ -371,6 +403,12 @@ def _startup_init_db() -> None:
371403
app.include_router(extraction.router)
372404
app.include_router(clustering.router)
373405

406+
# Mount final_results directory as static files for direct file access
407+
final_results_path = Path("final_results")
408+
if final_results_path.exists():
409+
app.mount("/final_results", StaticFiles(directory=str(final_results_path)), name="final_results")
410+
logger.info(f"Mounted /final_results from {final_results_path.absolute()}")
411+
374412
# NOTE:
375413
# All of the primary API endpoints are implemented in `stringsight/routers/*` and
376414
# are registered above via `app.include_router(...)`.
@@ -410,7 +448,7 @@ def _run_cluster_job(job: ClusterJob, req: ClusterRunRequest):
410448
try:
411449
asyncio.run(_run_cluster_job_async(job, req))
412450
except Exception as e:
413-
logger.error(f"Error in background cluster job: {e}")
451+
logger.error(f"Error in background cluster job {job.id}: {e}", exc_info=True)
414452
with _CLUSTER_JOBS_LOCK:
415453
job.state = "error"
416454
job.error = str(e)
@@ -449,11 +487,13 @@ async def _run_cluster_job_async(job: ClusterJob, req: ClusterRunRequest):
449487
if hasattr(_llm_utils, "_default_llm_utils"):
450488
_llm_utils._default_llm_utils = None # type: ignore
451489
except Exception:
490+
# Intentionally silent - cache clearing is best-effort
452491
pass
453492
try:
454493
if hasattr(_cu, "_cache"):
455494
_cu._cache = None # type: ignore
456495
except Exception:
496+
# Intentionally silent - cache clearing is best-effort
457497
pass
458498

459499
# Preprocess operationalRows to handle score_columns conversion
@@ -545,8 +585,8 @@ async def _run_cluster_job_async(job: ClusterJob, req: ClusterRunRequest):
545585
meta=p.get("meta", {})
546586
)
547587
properties.append(prop)
548-
except Exception as e:
549-
logger.warning(f"Skipping invalid property: {e}")
588+
except (KeyError, TypeError, ValueError) as e:
589+
logger.warning(f"Skipping invalid property (missing/invalid fields): {e}")
550590
continue
551591

552592
if not properties:

stringsight/clusterers/base.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,26 @@ def _build_clusters_from_df(self, df: pd.DataFrame, column_name: str) -> List[Cl
291291
292292
Group rows by cluster id, extract labels and collect
293293
`question_id`, `{column_name}` and `id` values for each cluster.
294+
295+
Filters out rows where property_description is empty or NaN to ensure
296+
only valid properties are included in clusters.
294297
"""
295298
label_col = f"{column_name}_cluster_label"
296299
id_col = f"{column_name}_cluster_id"
297300

301+
# Filter out invalid properties (empty or NaN property descriptions)
302+
if column_name in df.columns:
303+
valid_mask = df[column_name].notna() & (df[column_name].astype(str).str.strip() != "")
304+
df_filtered = df[valid_mask].copy()
305+
306+
invalid_count = len(df) - len(df_filtered)
307+
if invalid_count > 0:
308+
self.log(f"Filtered out {invalid_count} properties with empty descriptions from clustering")
309+
else:
310+
df_filtered = df
311+
298312
clusters: List[Cluster] = []
299-
for cid, group in df.groupby(id_col):
313+
for cid, group in df_filtered.groupby(id_col):
300314
cid_group = group[group[id_col] == cid]
301315
label = str(cid_group[label_col].iloc[0])
302316

stringsight/clusterers/hdbscan.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,14 @@ async def cluster(self, data: PropertyDataset, column_name: str, progress_callba
131131
# `to_dataframe(type="properties")` may include conversation rows without any extracted
132132
# properties (i.e., missing/NaN `column_name`). Those rows cannot be clustered and can
133133
# also coerce cluster id dtypes to float downstream, which breaks group metadata mapping.
134+
# Also filter out empty strings to ensure only valid properties are clustered.
134135
if column_name in df.columns:
136+
initial_count = len(df)
135137
df = df[df[column_name].notna()].copy()
138+
df = df[df[column_name].astype(str).str.strip() != ""].copy()
139+
filtered_count = initial_count - len(df)
140+
if filtered_count > 0:
141+
self.log(f"Filtered out {filtered_count} properties with empty or missing descriptions before clustering")
136142

137143
if getattr(self, "verbose", False):
138144
logger.debug(f"DataFrame shape after to_dataframe: {df.shape}")
@@ -251,7 +257,8 @@ async def _cluster_group_async(group_info):
251257
if progress_callback:
252258
try:
253259
progress_callback((i + 1) / total_groups)
254-
except Exception:
260+
except Exception as e:
261+
logger.debug(f"Progress callback failed: {e}")
255262
pass
256263
clustered_df = pd.concat(clustered_parts, ignore_index=True)
257264
else:

stringsight/db_models/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class Job(Base): # type: ignore[misc, valid-type]
3030
error_message = Column(Text, nullable=True)
3131

3232
# Timestamps
33-
created_at = Column(DateTime(timezone=True), server_default=func.now())
33+
created_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
3434
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
3535

3636
# Relationships

stringsight/logging_config.py

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,39 @@
55
via environment variables:
66
- STRINGSIGHT_LOG_LEVEL: Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
77
- STRINGSIGHT_LOG_FORMAT: Custom log format (optional)
8+
- STRINGSIGHT_JSON_LOGS: Enable JSON structured logging (true/false)
9+
10+
Default format includes timestamp, level, module name, and message:
11+
[2024-01-15 10:30:45] INFO [module.name] Message here
812
913
Usage:
1014
from stringsight.logging_config import get_logger
11-
15+
1216
logger = get_logger(__name__)
1317
logger.debug("Debug message")
1418
logger.info("Info message")
1519
logger.warning("Warning message")
16-
logger.error("Error message")
20+
logger.error("Error message", exc_info=True) # Include stack trace
21+
22+
# Add extra context fields
23+
logger.error("Job failed", exc_info=True, extra={"job_id": job_id})
1724
"""
1825

1926
import logging
2027
import os
2128
import sys
22-
from typing import Optional
29+
import json
30+
from typing import Optional, Any, Dict
2331

2432

25-
# Default log format - simple format without timestamp/level for cleaner output
26-
DEFAULT_LOG_FORMAT = "%(message)s"
33+
# Default log format - includes timestamp, level, module name, and message
34+
# Format: [2024-01-15 10:30:45] INFO [module.name] Message here
35+
DEFAULT_LOG_FORMAT = "[%(asctime)s] %(levelname)s [%(name)s] %(message)s"
2736
DEFAULT_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
2837

38+
# Simple format option (for backwards compatibility, set via env var)
39+
SIMPLE_LOG_FORMAT = "%(message)s"
40+
2941

3042
def _get_log_level() -> int:
3143
"""Get the logging level from environment variable or default to INFO."""
@@ -38,6 +50,38 @@ def _get_log_format() -> str:
3850
return os.environ.get("STRINGSIGHT_LOG_FORMAT", DEFAULT_LOG_FORMAT)
3951

4052

53+
def _use_json_logs() -> bool:
54+
"""Check if JSON logging is enabled via environment variable."""
55+
json_logs_env = os.environ.get("STRINGSIGHT_JSON_LOGS", "false").lower()
56+
return json_logs_env in ("true", "1", "yes")
57+
58+
59+
class JSONFormatter(logging.Formatter):
60+
"""JSON formatter for structured logging in production."""
61+
62+
def format(self, record: logging.LogRecord) -> str:
63+
"""Format log record as JSON."""
64+
log_data: Dict[str, Any] = {
65+
"timestamp": self.formatTime(record, self.datefmt),
66+
"level": record.levelname,
67+
"logger": record.name,
68+
"message": record.getMessage(),
69+
}
70+
71+
# Add exception info if present
72+
if record.exc_info:
73+
log_data["exception"] = self.formatException(record.exc_info)
74+
75+
# Add extra fields if present
76+
if hasattr(record, "job_id"):
77+
log_data["job_id"] = record.job_id
78+
if hasattr(record, "stage"):
79+
log_data["stage"] = record.stage
80+
81+
return json.dumps(log_data)
82+
83+
84+
4185
def configure_logging(
4286
level: Optional[int] = None,
4387
format_string: Optional[str] = None,
@@ -59,15 +103,29 @@ def configure_logging(
59103

60104
if date_format is None:
61105
date_format = DEFAULT_DATE_FORMAT
62-
106+
107+
# Check if JSON logging is enabled
108+
use_json = _use_json_logs()
109+
63110
# Configure the root logger
64-
logging.basicConfig(
65-
level=level,
66-
format=format_string,
67-
datefmt=date_format,
68-
stream=sys.stdout,
69-
force=True # Override any existing configuration
70-
)
111+
if use_json:
112+
# Use JSON formatter for structured logging
113+
handler = logging.StreamHandler(sys.stdout)
114+
handler.setFormatter(JSONFormatter(datefmt=date_format))
115+
logging.basicConfig(
116+
level=level,
117+
handlers=[handler],
118+
force=True
119+
)
120+
else:
121+
# Use standard text formatter
122+
logging.basicConfig(
123+
level=level,
124+
format=format_string,
125+
datefmt=date_format,
126+
stream=sys.stdout,
127+
force=True # Override any existing configuration
128+
)
71129

72130
# Suppress noisy third-party library logs
73131
logging.getLogger("LiteLLM").setLevel(logging.WARNING)

0 commit comments

Comments
 (0)