Skip to content

Commit 42edc2c

Browse files
committed
Batch LinkedIn import and add rename endpoint
Improve queue API: make LinkedIn CSV import robust and batched, add column-rename endpoints, and return more recent records. Changes include: switch import to linkedin.csv with dynamic header detection, skip blank rows, batch inserts (500) with an added 'group' value and imported count returned; update get route to return 10 most-recent records instead of one; add new rename/rename_column route handlers to ALTER TABLE RENAME COLUMN with error handling; wire up the rename router in queue __init__.py and add a sample queue_output.txt. These updates improve import reliability and provide a programmatic column-rename API.
1 parent 227e74e commit 42edc2c

6 files changed

Lines changed: 163 additions & 30 deletions

File tree

app/api/queue/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99
from .routes.create import router as create_router
1010

1111
from .routes.import_linkedin import router as import_linkedin_router
12+
1213
from .routes.alter import router as alter_router
14+
from .routes.rename_column import router as rename_router
1315

1416
router = APIRouter()
1517
router.include_router(drop_router)
1618
router.include_router(empty_router)
1719
router.include_router(get_router)
1820
router.include_router(create_router)
1921
router.include_router(import_linkedin_router)
20-
router.include_router(alter_router)
22+
router.include_router(alter_router)
23+
router.include_router(rename_router)

app/api/queue/routes/get.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ def read_queue() -> dict:
2727
for row in cursor.fetchall()
2828
]
2929

30-
# 3. Get most recently updated record
31-
cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 1;")
30+
# 3. Get the 10 most recently updated records
31+
cursor.execute("SELECT * FROM queue ORDER BY updated DESC LIMIT 10;")
3232
columns = [desc[0] for desc in cursor.description] if cursor.description else []
33-
row = cursor.fetchone()
34-
most_recent = dict(zip(columns, row)) if row and columns else None
33+
rows = cursor.fetchall()
34+
most_recent = [dict(zip(columns, row)) for row in rows] if rows and columns else []
3535

3636
conn.close()
3737

3838
return {
3939
"meta": make_meta("success", "Queue table info"),
4040
"data": {
41-
"queued": record_count,
42-
"most_recent": most_recent,
43-
# "schema": schema
41+
"queued": record_count,
42+
"most_recent": most_recent,
43+
# "schema": schema
4444
}
4545
}
4646
except Exception as e:

app/api/queue/routes/import_linkedin.py

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,73 @@
99

1010
@router.post("/queue/import/linkedin")
1111
def import_linkedin_csv() -> dict:
12-
"""POST /queue/import/linkedin: Import data from linkedin_sample.csv into the queue table."""
12+
"""POST /queue/import/linkedin: Import data from linkedin.csv into the queue table, robust for large files."""
1313
csv_path = os.path.join(os.path.dirname(__file__), "../csv/linkedin/linkedin_sample.csv")
1414
if not os.path.exists(csv_path):
15-
raise HTTPException(status_code=404, detail="linkedin_sample.csv not found")
15+
raise HTTPException(status_code=404, detail="linkedin.csv not found")
1616
try:
1717
conn = get_db_connection_direct()
1818
cursor = conn.cursor()
1919
with open(csv_path, newline='', encoding='utf-8') as csvfile:
20-
reader = csv.DictReader(row for row in csvfile if not row.startswith('Notes:'))
20+
# Find the header line dynamically
21+
header_line = None
22+
pre_data_lines = []
23+
while True:
24+
pos = csvfile.tell()
25+
line = csvfile.readline()
26+
if not line:
27+
break
28+
if line.strip().startswith("First Name,Last Name,URL,Email Address,Company,Position,Connected On"):
29+
header_line = line.strip()
30+
break
31+
pre_data_lines.append(line)
32+
if not header_line:
33+
raise HTTPException(status_code=400, detail="CSV header not found.")
34+
# Use DictReader with the found header
35+
fieldnames = header_line.split(",")
36+
reader = csv.DictReader(csvfile, fieldnames=fieldnames)
2137
now = int(time.time())
38+
batch = []
39+
batch_size = 500
40+
first_row = None
41+
imported_count = 0
2242
for row in reader:
23-
cursor.execute(
24-
"""
25-
INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection)
26-
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
27-
""",
28-
[
29-
row.get('First Name'),
30-
row.get('Last Name'),
31-
row.get('URL'),
32-
row.get('Email Address'),
33-
row.get('Company'),
34-
row.get('Position'),
35-
row.get('Connected On'),
36-
now,
37-
now,
38-
False,
39-
'prospects'
40-
]
43+
# Skip any rows that are just blank or not data
44+
if not any(row.values()):
45+
continue
46+
if first_row is None:
47+
first_row = row.copy()
48+
print("DEBUG: First parsed row from CSV:", first_row)
49+
batch.append([
50+
row.get('First Name'),
51+
row.get('Last Name'),
52+
row.get('URL'),
53+
row.get('Email Address'),
54+
row.get('Company'),
55+
row.get('Position'),
56+
row.get('Connected On'),
57+
now,
58+
now,
59+
False,
60+
'prospects',
61+
'linkedin'
62+
])
63+
imported_count += 1
64+
if len(batch) >= batch_size:
65+
cursor.executemany(
66+
'''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group")
67+
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
68+
batch
69+
)
70+
batch = []
71+
if batch:
72+
cursor.executemany(
73+
'''INSERT INTO queue (first_name, last_name, url, email_address, company, position, connected_on, created, updated, hidden, collection, "group")
74+
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
75+
batch
4176
)
4277
conn.commit()
4378
conn.close()
44-
return {"meta": make_meta("success", "LinkedIn CSV imported")}
79+
return {"meta": make_meta("success", f"LinkedIn CSV imported (batched): {imported_count} records imported"), "imported": imported_count}
4580
except Exception as e:
4681
raise HTTPException(status_code=500, detail=str(e))

app/api/queue/routes/rename.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from fastapi import APIRouter, HTTPException, Body
2+
from app.utils.make_meta import make_meta
3+
from app.utils.db import get_db_connection_direct
4+
5+
router = APIRouter()
6+
7+
@router.post("/queue/alter/rename_column")
8+
def rename_column(
9+
old_name: str = Body(..., embed=True),
10+
new_name: str = Body(..., embed=True),
11+
column_type: str = Body(..., embed=True)
12+
) -> dict:
13+
"""POST /queue/alter/rename-column: Rename a column in the queue table."""
14+
try:
15+
conn = get_db_connection_direct()
16+
cursor = conn.cursor()
17+
sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";'
18+
cursor.execute(sql)
19+
conn.commit()
20+
conn.close()
21+
return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")}
22+
except Exception as e:
23+
msg = str(e)
24+
if 'does not exist' in msg:
25+
raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.")
26+
if 'already exists' in msg:
27+
raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.")
28+
raise HTTPException(status_code=500, detail=msg)
29+
from fastapi import APIRouter, HTTPException, Body
30+
from app.utils.make_meta import make_meta
31+
from app.utils.db import get_db_connection_direct
32+
33+
router = APIRouter()
34+
35+
@router.post("/queue/alter/rename_column")
36+
def rename_column(
37+
old_name: str = Body(..., embed=True),
38+
new_name: str = Body(..., embed=True),
39+
column_type: str = Body(..., embed=True)
40+
) -> dict:
41+
"""POST /queue/alter/rename-column: Rename a column in the queue table."""
42+
try:
43+
conn = get_db_connection_direct()
44+
cursor = conn.cursor()
45+
sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";'
46+
cursor.execute(sql)
47+
conn.commit()
48+
conn.close()
49+
return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")}
50+
except Exception as e:
51+
msg = str(e)
52+
if 'does not exist' in msg:
53+
raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.")
54+
if 'already exists' in msg:
55+
raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.")
56+
raise HTTPException(status_code=500, detail=msg)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from fastapi import APIRouter, HTTPException, Body
2+
from app.utils.make_meta import make_meta
3+
from app.utils.db import get_db_connection_direct
4+
5+
router = APIRouter()
6+
7+
@router.post("/queue/alter/rename_column")
8+
def rename_column(
9+
old_name: str = Body(..., embed=True),
10+
new_name: str = Body(..., embed=True),
11+
column_type: str = Body(..., embed=True)
12+
) -> dict:
13+
"""POST /queue/alter/rename_column: Rename a column in the queue table."""
14+
try:
15+
conn = get_db_connection_direct()
16+
cursor = conn.cursor()
17+
sql = f'ALTER TABLE queue RENAME COLUMN "{old_name}" TO "{new_name}";'
18+
cursor.execute(sql)
19+
conn.commit()
20+
conn.close()
21+
return {"meta": make_meta("success", f"Column '{old_name}' renamed to '{new_name}'")}
22+
except Exception as e:
23+
msg = str(e)
24+
if 'does not exist' in msg:
25+
raise HTTPException(status_code=400, detail=f"Column '{old_name}' does not exist in queue table.")
26+
if 'already exists' in msg:
27+
raise HTTPException(status_code=400, detail=f"Column '{new_name}' already exists in queue table.")
28+
raise HTTPException(status_code=500, detail=msg)

queue_output.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
id,first_name,last_name,url,email_address,company,position,connected_on,created,updated,hidden,collection,group
2+
22,,,,,,,,1776513092,1776513092,False,prospects,linkedin
3+
23,,,,,,,,1776513092,1776513092,False,prospects,linkedin
4+
24,,,,,,,,1776513092,1776513092,False,prospects,linkedin
5+
25,,,,,,,,1776513092,1776513092,False,prospects,linkedin
6+
26,,,,,,,,1776513092,1776513092,False,prospects,linkedin
7+
27,,,,,,,,1776513092,1776513092,False,prospects,linkedin
8+
28,,,,,,,,1776513092,1776513092,False,prospects,linkedin
9+
29,,,,,,,,1776513092,1776513092,False,prospects,linkedin
10+
30,,,,,,,,1776513092,1776513092,False,prospects,linkedin
11+
21,,,,,,,,1776513092,1776513092,False,prospects,linkedin

0 commit comments

Comments
 (0)