Skip to content

Commit c1f1be9

Browse files
Merge pull request #27 from goldlabelapps/staging
This pull request introduces several new endpoints, enhancements, and code organization improvements for the prospects API, with a focus on scalable data ingestion, database schema management, and improved documentation.
2 parents 876be5b + 9b24604 commit c1f1be9

13 files changed

Lines changed: 311 additions & 14 deletions

File tree

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,20 @@ The API is at <http://localhost:8000>.
2626

2727
- **Python 3.11+**
2828
- **Postgres**
29-
- **tsvector** - Superfast search
29+
- **tsvector** - Superfast full-text search (with GIN index)
30+
### Full-Text Search (tsvector)
31+
32+
The prospects table includes a `search_vector` column (type: tsvector) that is automatically computed from all text fields on insert. A GIN index is created for this column, enabling fast and scalable full-text search queries.
33+
34+
**How it works:**
35+
- On every insert (via `/prospects/seed` or `/prospects/process`), the `search_vector` is computed from all text columns using PostgreSQL's `to_tsvector('english', ...)`.
36+
- The GIN index (`idx_prospects_search_vector`) allows efficient search queries like:
37+
38+
```sql
39+
SELECT * FROM prospects WHERE search_vector @@ plainto_tsquery('english', 'search terms');
40+
```
41+
42+
This makes searching across all text fields in the prospects table extremely fast, even for large datasets.
3043
- **FastAPI** — RESTful API framework
3144
- **Uvicorn** — ASGI server
3245
- **Pytest** — testing framework
@@ -60,5 +73,24 @@ requirements.txt
6073
| GET | `/` | Welcome message |
6174
| GET | `/health` | Health check — returns `ok` |
6275
| POST | `/echo` | Echoes the JSON `message` field |
76+
| GET | `/prospects/seed` | (Re)create prospects table and seed with sample data |
77+
| DELETE | `/prospects/process` | (Legacy) Empties the prospects table |
78+
| GET | `/prospects/process` | Process and insert all records from big.csv into prospects table |
79+
80+
### Processing Large CSV Files
81+
82+
The `/prospects/process` endpoint is designed for robust, scalable ingestion of large CSV files (e.g., 1300+ rows, 300KB+). It follows the same normalization and insertion pattern as `/prospects/seed`, but is optimized for large files:
83+
84+
85+
#### Example usage
86+
87+
1. Seed the table structure:
88+
- `GET /prospects/seed`
89+
2. (Optional) Empty the table:
90+
- `DELETE /prospects/empty`
91+
3. Process the large CSV:
92+
- `GET /prospects/process`
93+
94+
The endpoint will return the number of records inserted. This is the core ingestion workflow for production-scale data.
6395

6496

app/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""NX AI - FastAPI/Python/Postgres/tsvector"""
22

33
# Current Version
4-
__version__ = "1.1.2"
4+
__version__ = "1.1.3"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from fastapi import APIRouter, status
2+
from app.utils.db import get_db_connection
3+
4+
router = APIRouter()
5+
6+
@router.get("/prospects/alter", status_code=status.HTTP_200_OK)
7+
def alter_prospects_table() -> dict:
8+
"""
9+
Checks if the 'prospects' table exists, then checks if the 'secondary_email' column exists.
10+
If both exist, attempts to drop the 'secondary_email' column and returns the result.
11+
"""
12+
import psycopg2
13+
column_name = 'tertiary_email_verification_source' # Change this variable to alter a different column
14+
conn_gen = get_db_connection()
15+
conn = next(conn_gen)
16+
cur = conn.cursor()
17+
try:
18+
# Check if 'prospects' table exists
19+
cur.execute("""
20+
SELECT EXISTS (
21+
SELECT 1 FROM information_schema.tables
22+
WHERE table_name = 'prospects'
23+
);
24+
""")
25+
table_row = cur.fetchone()
26+
if table_row is None:
27+
result = {"detail": "Error: Could not fetch table existence result."}
28+
return result
29+
table_exists = table_row[0]
30+
if not table_exists:
31+
result = {"detail": "Table 'prospects' does not exist."}
32+
return result
33+
34+
# Check if the column exists
35+
cur.execute(f"""
36+
SELECT EXISTS (
37+
SELECT 1 FROM information_schema.columns
38+
WHERE table_name = 'prospects' AND column_name = %s
39+
);
40+
""", (column_name,))
41+
column_row = cur.fetchone()
42+
if column_row is None:
43+
result = {"detail": "Error: Could not fetch column existence result."}
44+
return result
45+
column_exists = column_row[0]
46+
if not column_exists:
47+
result = {"detail": f"Column '{column_name}' does not exist in 'prospects' table."}
48+
return result
49+
50+
# Try to drop the column
51+
try:
52+
cur.execute(f'ALTER TABLE prospects DROP COLUMN {column_name};')
53+
conn.commit()
54+
result = {"detail": f"Column '{column_name}' dropped successfully from 'prospects' table."}
55+
except Exception as e:
56+
conn.rollback()
57+
result = {"detail": f"Failed to drop column: {str(e)}"}
58+
except Exception as e:
59+
conn.rollback()
60+
result = {"detail": f"Error: {str(e)}"}
61+
finally:
62+
cur.close()
63+
conn.close()
64+
return result
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import os, time
2+
from fastapi import APIRouter, status
3+
from app.utils.db import get_db_connection
4+
5+
router = APIRouter()
6+
7+
CSV_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data/big.csv'))
8+
9+
10+
import csv
11+
import io
12+
13+
def normalize_column(col):
14+
import re
15+
col = col.strip().lower().replace(' ', '_')
16+
col = re.sub(r'[^a-z0-9_]', '', col)
17+
if col and col[0].isdigit():
18+
col = '_' + col
19+
return col
20+
21+
@router.get("/prospects/process", status_code=status.HTTP_200_OK)
22+
def process_prospects() -> dict:
23+
"""
24+
Process and insert data from the large CSV file (big.csv) into the prospects table.
25+
The table must already exist with the correct columns (run seed and empty first).
26+
This endpoint is robust and scalable for large files.
27+
"""
28+
import psycopg2
29+
BATCH_SIZE = 200
30+
conn_gen = get_db_connection()
31+
conn = next(conn_gen)
32+
cur = conn.cursor()
33+
inserted = 0
34+
try:
35+
with open(CSV_PATH, newline='', encoding='utf-8') as csvfile:
36+
reader = csv.reader(csvfile)
37+
columns_raw = next(reader)
38+
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
39+
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
40+
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]
41+
placeholders = ', '.join(['%s'] * len(columns))
42+
batch = []
43+
for row in reader:
44+
filtered_row = [row[i] for i in col_indices]
45+
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
46+
batch.append(filtered_row + [text_content])
47+
if len(batch) >= BATCH_SIZE:
48+
cur.executemany(
49+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
50+
batch
51+
)
52+
inserted += len(batch)
53+
batch = []
54+
if batch:
55+
cur.executemany(
56+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
57+
batch
58+
)
59+
inserted += len(batch)
60+
conn.commit()
61+
result = {"detail": f"Inserted {inserted} records from big.csv into prospects table."}
62+
except psycopg2.errors.UndefinedTable:
63+
conn.rollback()
64+
result = {"detail": "Table 'prospects' does not exist. No records inserted."}
65+
except Exception as e:
66+
conn.rollback()
67+
result = {"detail": f"Error: {str(e)}"}
68+
finally:
69+
cur.close()
70+
conn.close()
71+
return result
Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,36 @@ def seed_prospects() -> dict:
2929
import io
3030
reader = csv.reader(io.StringIO(csv_data))
3131
columns_raw = next(reader)
32-
columns = [normalize_column(col) for col in columns_raw]
32+
# Remove 'Secondary Email' column and its variants
33+
remove_cols = {'secondary_email', 'secondary_email_source', 'secondary_email_status', 'secondary_email_verification_source'}
34+
columns = [normalize_column(col) for col in columns_raw if normalize_column(col) not in remove_cols]
35+
col_indices = [i for i, col in enumerate([normalize_column(col) for col in columns_raw]) if col not in remove_cols]
3336

34-
# Drop and recreate table
37+
38+
# Drop and recreate table with tsvector column
3539
cur.execute('DROP TABLE IF EXISTS prospects;')
3640
create_cols = ',\n '.join([f'{col} TEXT' for col in columns])
37-
cur.execute(f'''CREATE TABLE prospects (\n id SERIAL PRIMARY KEY,\n {create_cols}\n);''')
41+
cur.execute(f'''
42+
CREATE TABLE prospects (
43+
id SERIAL PRIMARY KEY,
44+
{create_cols},
45+
search_vector tsvector
46+
);
47+
''')
48+
# Create GIN index for full-text search
49+
cur.execute('CREATE INDEX IF NOT EXISTS idx_prospects_search_vector ON prospects USING GIN (search_vector);')
50+
3851

39-
# Insert rows
52+
# Insert rows with tsvector
4053
for row in reader:
54+
# Only keep values for columns we want
55+
filtered_row = [row[i] for i in col_indices]
4156
placeholders = ', '.join(['%s'] * len(columns))
57+
# Concatenate all text fields for tsvector
58+
text_content = ' '.join([str(val) for val in filtered_row if val is not None])
4259
cur.execute(
43-
f"INSERT INTO prospects ({', '.join(columns)}) VALUES ({placeholders})",
44-
row
60+
f"INSERT INTO prospects ({', '.join(columns)}, search_vector) VALUES ({placeholders}, to_tsvector('english', %s))",
61+
filtered_row + [text_content]
4562
)
4663

4764
conn.commit()

app/api/prospects/prospects.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,35 @@
66

77
router = APIRouter()
88

9+
10+
# Endpoint to get unique values for specified fields
11+
from fastapi import Query
12+
13+
@router.get("/prospects/unique")
14+
def get_unique_fields(fields: list[str] = Query(..., description="List of field names to get unique values for")) -> dict:
15+
"""Return lists of unique values and their counts for specified fields in the prospects table."""
16+
conn_gen = get_db_connection()
17+
conn = next(conn_gen)
18+
cur = conn.cursor()
19+
result = {}
20+
errors = {}
21+
try:
22+
for field in fields:
23+
try:
24+
cur.execute(f'SELECT "{field}", COUNT(*) FROM prospects WHERE "{field}" IS NOT NULL GROUP BY "{field}" ORDER BY COUNT(*) DESC;')
25+
values = [
26+
{"value": row[0], "count": row[1]} for row in cur.fetchall()
27+
]
28+
result[field] = values
29+
except Exception as e:
30+
errors[field] = str(e)
31+
meta = make_meta("success", f"Unique values and counts for fields: {fields}")
32+
return {"meta": meta, "data": result, "errors": errors if errors else None}
33+
finally:
34+
cur.close()
35+
conn.close()
36+
37+
938
@router.get("/prospects")
1039
def root() -> dict:
1140
"""Return all prospects table records"""
@@ -24,13 +53,13 @@ def root() -> dict:
2453
},
2554
]
2655
try:
27-
cur.execute('SELECT * FROM prospects;')
56+
cur.execute('SELECT * FROM prospects LIMIT 200;')
2857
if cur.description is None:
2958
prospects = []
3059
else:
3160
columns = [desc[0] for desc in cur.description]
3261
prospects = [dict(zip(columns, row)) for row in cur.fetchall()]
33-
meta = make_meta("success", "Prospects List")
62+
meta = make_meta("success", "Prospects List (max 200)")
3463
result = {"meta": meta, "data": prospects}
3564
except Exception as e:
3665
import psycopg2
@@ -44,3 +73,12 @@ def root() -> dict:
4473
cur.close()
4574
conn.close()
4675
return result
76+
77+
78+
# New endpoint: /prospects/init
79+
@router.get("/prospects/init")
80+
def prospects_init() -> dict:
81+
"""Initialize prospects (placeholder endpoint)"""
82+
meta = make_meta("success", "Initialized prospects (placeholder)")
83+
data = {"message": "This is a placeholder for prospects/init."}
84+
return {"meta": meta, "data": data}

app/api/routes.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
from app.api.root import router as root_router
1414
from app.api.health import router as health_router
1515

16-
from app.api.prospects.prospects import router as prospects_router
1716

18-
from app.api.prospects.seed import router as prospects_seed_router
19-
from app.api.prospects.empty import router as prospects_empty_router
17+
from app.api.prospects.prospects import router as prospects_router
18+
from app.api.prospects.database.alter import router as prospects_alter_router
19+
from app.api.prospects.database.seed import router as prospects_seed_router
20+
from app.api.prospects.database.empty import router as prospects_empty_router
21+
from app.api.prospects.database.process import router as prospects_process_router
2022

2123
router.include_router(root_router)
2224
router.include_router(health_router)
2325
router.include_router(prospects_router)
26+
router.include_router(prospects_alter_router)
2427
router.include_router(prospects_seed_router)
25-
router.include_router(prospects_empty_router)
28+
router.include_router(prospects_empty_router)
29+
router.include_router(prospects_process_router)

0 commit comments

Comments
 (0)