Skip to content

Commit 14c2187

Browse files
committed
Add CSV processing and full-text search
Add robust CSV ingestion and full-text search support for prospects. - Add app/api/prospects/process.py: new /prospects/process endpoint to batch-process big.csv and insert rows with a computed search_vector (to_tsvector) for full-text search. - Update seed logic (app/api/prospects/seed.py): remove secondary email columns, create search_vector tsvector column, add GIN index, and insert rows with to_tsvector('english', ...) populated from concatenated text fields. - Limit prospects listing (app/api/prospects/prospects.py) to 200 rows and update response meta to note the limit. - Register the new process router in app/api/routes.py. - Update README.md with documentation on the tsvector column, GIN index, the /prospects/process endpoint, and the recommended ingestion workflow. These changes enable fast, scalable full-text search across all text fields and provide a dedicated endpoint for processing large CSV datasets using batch inserts.
1 parent 5a8fa4b commit 14c2187

5 files changed

Lines changed: 133 additions & 10 deletions

File tree

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,20 @@ The API is at <http://localhost:8000>.
2626

2727
- **Python 3.11+**
2828
- **Postgres**
29-
- **tsvector** - Superfast search
29+
- **tsvector** - Superfast full-text search (with GIN index)
30+
### Full-Text Search (tsvector)
31+
32+
The prospects table includes a `search_vector` column (type: tsvector) that is automatically computed from all text fields on insert. A GIN index is created for this column, enabling fast and scalable full-text search queries.
33+
34+
**How it works:**
35+
- On every insert (via `/prospects/seed` or `/prospects/process`), the `search_vector` is computed from all text columns using PostgreSQL's `to_tsvector('english', ...)`.
36+
- The GIN index (`idx_prospects_search_vector`) allows efficient search queries like:
37+
38+
```sql
39+
SELECT * FROM prospects WHERE search_vector @@ plainto_tsquery('english', 'search terms');
40+
```
41+
42+
This makes searching across all text fields in the prospects table extremely fast, even for large datasets.
3043
- **FastAPI** — RESTful API framework
3144
- **Uvicorn** — ASGI server
3245
- **Pytest** — testing framework
@@ -60,5 +73,24 @@ requirements.txt
6073
| GET | `/` | Welcome message |
6174
| GET | `/health` | Health check — returns `ok` |
6275
| POST | `/echo` | Echoes the JSON `message` field |
76+
| GET | `/prospects/seed` | (Re)create prospects table and seed with sample data |
77+
| DELETE | `/prospects/process` | (Legacy) Empties the prospects table |
78+
| GET | `/prospects/process` | Process and insert all records from big.csv into prospects table |
79+
80+
### Processing Large CSV Files
81+
82+
The `/prospects/process` endpoint is designed for robust, scalable ingestion of large CSV files (e.g., 1300+ rows, 300KB+). It follows the same normalization and insertion pattern as `/prospects/seed`, but is optimized for large files:
83+
84+
85+
#### Example usage
86+
87+
1. Seed the table structure:
88+
- `GET /prospects/seed`
89+
2. (Optional) Empty the table:
90+
- `DELETE /prospects/empty`
91+
3. Process the large CSV:
92+
- `GET /prospects/process`
93+
94+
The endpoint will return the number of records inserted. This is the core ingestion workflow for production-scale data.
6395

6496

app/api/prospects/process.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import os, time
2+
from fastapi import APIRouter, status
3+
from app.utils.db import get_db_connection
4+
5+
router = APIRouter()
6+
7+
CSV_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data/big.csv'))
8+
9+
10+
import csv
11+
import io
12+
13+
def normalize_column(col):
14+
import re
15+
col = col.strip().lower().replace(' ', '_')
16+
col = re.sub(r'[^a-z0-9_]', '', col)
17+
if col and col[0].isdigit():
18+
col = '_' + col
19+
return col
20+
21+
@router.get("/prospects/process", status_code=status.HTTP_200_OK)
22+
def process_prospects() -> dict:
23+
"""
24+
Process and insert data from the large CSV file (big.csv) into the prospects table.
25+
The table must already exist with the correct columns (run seed and empty first).
26+
This endpoint is robust and scalable for large files.
27+
"""
28+
import psycopg2
29+
BATCH_SIZE = 200
30+
conn_gen = get_db_connection()
31+
conn = next(conn_gen)
32+
cur = conn.cursor()
33+
inserted = 0
34+
try:
35+
with open(CSV_PATH, newline='', encoding='utf-8') as csvfile:
36+
reader = csv.reader(csvfile)
37+
columns_raw = next(reader)
38+
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
39+
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
40+
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]
41+
placeholders = ', '.join(['%s'] * len(columns))
42+
batch = []
43+
for row in reader:
44+
filtered_row = [row[i] for i in col_indices]
45+
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
46+
batch.append(filtered_row + [text_content])
47+
if len(batch) >= BATCH_SIZE:
48+
cur.executemany(
49+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
50+
batch
51+
)
52+
inserted += len(batch)
53+
batch = []
54+
if batch:
55+
cur.executemany(
56+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
57+
batch
58+
)
59+
inserted += len(batch)
60+
conn.commit()
61+
result = {"detail": f"Inserted {inserted} records from big.csv into prospects table."}
62+
except psycopg2.errors.UndefinedTable:
63+
conn.rollback()
64+
result = {"detail": "Table 'prospects' does not exist. No records inserted."}
65+
except Exception as e:
66+
conn.rollback()
67+
result = {"detail": f"Error: {str(e)}"}
68+
finally:
69+
cur.close()
70+
conn.close()
71+
return result

app/api/prospects/prospects.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ def root() -> dict:
2424
},
2525
]
2626
try:
27-
cur.execute('SELECT * FROM prospects;')
27+
cur.execute('SELECT * FROM prospects LIMIT 200;')
2828
if cur.description is None:
2929
prospects = []
3030
else:
3131
columns = [desc[0] for desc in cur.description]
3232
prospects = [dict(zip(columns, row)) for row in cur.fetchall()]
33-
meta = make_meta("success", "Prospects List")
33+
meta = make_meta("success", "Prospects List (max 200)")
3434
result = {"meta": meta, "data": prospects}
3535
except Exception as e:
3636
import psycopg2

app/api/prospects/seed.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,36 @@ def seed_prospects() -> dict:
2929
import io
3030
reader = csv.reader(io.StringIO(csv_data))
3131
columns_raw = next(reader)
32-
columns = [normalize_column(col) for col in columns_raw]
32+
# Remove 'Secondary Email' column and its variants
33+
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
34+
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
35+
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]
3336

34-
# Drop and recreate table
37+
38+
# Drop and recreate table with tsvector column
3539
cur.execute('DROP TABLE IF EXISTS prospects;')
3640
create_cols = ',\n '.join([f'{col} TEXT' for col in columns])
37-
cur.execute(f'''CREATE TABLE prospects (\n id SERIAL PRIMARY KEY,\n {create_cols}\n);''')
41+
cur.execute(f'''
42+
CREATE TABLE prospects (
43+
id SERIAL PRIMARY KEY,
44+
{create_cols},
45+
search_vector tsvector
46+
);
47+
''')
48+
# Create GIN index for full-text search
49+
cur.execute('CREATE INDEX IF NOT EXISTS idx_prospects_search_vector ON prospects USING GIN (search_vector);')
50+
3851

39-
# Insert rows
52+
# Insert rows with tsvector
4053
for row in reader:
54+
# Only keep values for columns we want
55+
filtered_row = [row[i] for i in col_indices]
4156
placeholders = ', '.join(['%s'] * len(columns))
57+
# Concatenate all text fields for tsvector
58+
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
4259
cur.execute(
43-
f"INSERT INTO prospects ({', '.join(columns)}) VALUES ({placeholders})",
44-
row
60+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
61+
filtered_row + [text_content]
4562
)
4663

4764
conn.commit()

app/api/routes.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
from app.api.prospects.prospects import router as prospects_router
1717

18+
1819
from app.api.prospects.seed import router as prospects_seed_router
1920
from app.api.prospects.empty import router as prospects_empty_router
21+
from app.api.prospects.process import router as prospects_process_router
2022

2123
router.include_router(root_router)
2224
router.include_router(health_router)
2325
router.include_router(prospects_router)
2426
router.include_router(prospects_seed_router)
25-
router.include_router(prospects_empty_router)
27+
router.include_router(prospects_empty_router)
28+
router.include_router(prospects_process_router)

0 commit comments

Comments
 (0)