Skip to content

Commit f80d656

Browse files
bramweltclaude
andcommitted
fix(scripts): address PR review comments
- Add runtime deps to pyproject.toml so uv sync works - Use meta.client.batch_get_item on dynamodb resource - Catch KeyNotFoundError explicitly; count other exceptions as errors, not missing entries - Replace offset pagination with scroll API in reindex_votes to avoid 10k result window limit - Fix _source field name object_id -> id in audit script - Make API_BASE configurable via env var, defaulting to production URL - Fix docstring table names in reindex_groupsio - Regenerate uv.lock with new dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Issue: LFXV2-1371 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Trevor Bramwell <tbramwell@linuxfoundation.org>
1 parent b48e881 commit f80d656

8 files changed

Lines changed: 446 additions & 60 deletions

File tree

scripts/reindexing/audit_opensearch.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
from urllib.parse import urlparse
3535

3636
from nats.aio.client import Client as NATS
37-
from opensearchpy import NotFoundError, OpenSearch
37+
from nats.js.errors import KeyNotFoundError
38+
from opensearchpy import OpenSearch
3839

3940

4041
SCROLL_SIZE = 500
@@ -176,17 +177,15 @@ async def audit_type(self, object_type: str, kv_bucket: str) -> Stats:
176177
for doc in docs:
177178
doc_id = doc["_id"] # OpenSearch document _id
178179
# Also check the `id` field in _source for the uuid
179-
source_id = doc.get("_source", {}).get("object_id") or doc_id
180+
source_id = doc.get("_source", {}).get("id") or doc_id
180181

181182
# Strip to bare UUID (last segment if namespaced)
182183
uuid = source_id.split(":")[-1] if ":" in source_id else source_id
183184

184185
try:
185-
entry = await kv.get(uuid)
186-
if entry is None:
187-
raise NotFoundError(404, "key not found", {})
186+
await kv.get(uuid)
188187
s.in_nats += 1
189-
except Exception:
188+
except KeyNotFoundError:
190189
s.missing_in_nats += 1
191190
s.missing_ids.append(doc_id)
192191

@@ -200,6 +199,9 @@ async def audit_type(self, object_type: str, kv_bucket: str) -> Stats:
200199
except Exception as e:
201200
s.errors += 1
202201
print(f" ERROR deleting {doc_id}: {e}")
202+
except Exception as e:
203+
s.errors += 1
204+
print(f" ERROR checking {doc_id}: {e}")
203205

204206
return s
205207

scripts/reindexing/pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,9 @@ version = "0.1.0"
77
description = "Add your description here"
88
readme = "README.md"
99
requires-python = ">=3.12"
10-
dependencies = []
10+
dependencies = [
11+
"boto3",
12+
"httpx",
13+
"nats-py",
14+
"opensearch-py",
15+
]

scripts/reindexing/reindex.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import boto3
2020
from boto3.dynamodb.conditions import Key
2121
from nats.aio.client import Client as NATS
22+
from nats.js.errors import KeyNotFoundError
2223

2324

2425
# Table configuration
@@ -246,14 +247,14 @@ def query_secondary_table(
246247
batch = parent_keys_list[i:i + 100]
247248
keys = [{config.primary_key: pk} for pk in batch]
248249

249-
response = self.dynamodb.batch_get_item(
250+
response = self.dynamodb.meta.client.batch_get_item(
250251
RequestItems={config.name: {"Keys": keys}}
251252
)
252253
items.extend(response.get("Responses", {}).get(config.name, []))
253254

254255
# Handle unprocessed keys
255256
while response.get("UnprocessedKeys"):
256-
response = self.dynamodb.batch_get_item(
257+
response = self.dynamodb.meta.client.batch_get_item(
257258
RequestItems=response["UnprocessedKeys"]
258259
)
259260
items.extend(response.get("Responses", {}).get(config.name, []))
@@ -292,11 +293,6 @@ async def check_and_reindex_entry(
292293
assert self.kv is not None, "NATS KV not connected"
293294
entry = await self.kv.get(kv_key)
294295

295-
if entry is None:
296-
print(f" ✗ Missing: {kv_key}")
297-
self.stats.add_missing(table_name, primary_key_value)
298-
return False
299-
300296
# Entry exists - trigger reindex if not in dry-run mode
301297
if not self.dry_run:
302298
# Update with the same value to trigger reindex
@@ -306,6 +302,10 @@ async def check_and_reindex_entry(
306302

307303
return True
308304

305+
except KeyNotFoundError:
306+
print(f" ✗ Missing: {kv_key}")
307+
self.stats.add_missing(table_name, primary_key_value)
308+
return False
309309
except Exception as e:
310310
print(f" ✗ Error checking {kv_key}: {e}")
311311
self.stats.add_error(table_name)

scripts/reindexing/reindex_committees.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323

2424
import httpx
2525
from nats.aio.client import Client as NATS
26+
from nats.js.errors import KeyNotFoundError
2627

2728

28-
# API_BASE = "https://api-gw.platform.linuxfoundation.org/project-service/v2"
29-
API_BASE = "https://api-gw.dev.platform.linuxfoundation.org/project-service/v2"
29+
API_BASE = os.environ.get(
30+
"API_BASE",
31+
"https://api-gw.platform.linuxfoundation.org/project-service/v2",
32+
)
3033
KV_BUCKET = "v1-objects"
3134
COMMITTEE_KV_PREFIX = "platform-collaboration__c"
3235
MEMBER_KV_PREFIX = "platform-community__c"
@@ -112,11 +115,6 @@ async def reindex_entry(self, kv_prefix: str, item_id: str):
112115
try:
113116
assert self.kv is not None, "NATS KV not connected"
114117
entry = await self.kv.get(kv_key)
115-
if entry is None:
116-
print(f" MISSING: {kv_key}")
117-
s.missing += 1
118-
s.missing_keys.append(item_id)
119-
return
120118

121119
if not self.dry_run:
122120
await self.kv.put(kv_key, entry.value)
@@ -125,10 +123,12 @@ async def reindex_entry(self, kv_prefix: str, item_id: str):
125123
else:
126124
print(f" found: {kv_key}")
127125

126+
except KeyNotFoundError:
127+
print(f" MISSING: {kv_key}")
128+
s.missing += 1
129+
s.missing_keys.append(item_id)
128130
except Exception as e:
129131
print(f" ERROR {kv_key}: {e}")
130-
s = self._stats(kv_prefix)
131-
s.missing += 1
132132
s.errors += 1
133133

134134
async def run(self, project_id: str):

scripts/reindexing/reindex_groupsio.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
Groups.io Service Reindexer
66
77
Given a project_id, this script:
8-
1. Looks up the groupsio service entry in itx-v2-groupsio-service
9-
2. Finds all subgroups/mailing lists via itx-v2-groupsio-subgroup (parent_id_index)
8+
1. Looks up the groupsio service entry in itx-groupsio-v2-service
9+
2. Finds all subgroups/mailing lists via itx-groupsio-v2-subgroup (parent_id_index)
1010
3. Reindexes each subgroup entry in the v1-objects NATS KV bucket
11-
4. For each subgroup, finds all members via itx-v2-groupsio-member (group_id_index)
11+
4. For each subgroup, finds all members via itx-groupsio-v2-member (group_id_index)
1212
5. Reindexes each member entry in the v1-objects NATS KV bucket
1313
6. Prints a summary report
1414
"""
@@ -22,6 +22,7 @@
2222
import boto3
2323
from boto3.dynamodb.conditions import Key
2424
from nats.aio.client import Client as NATS
25+
from nats.js.errors import KeyNotFoundError
2526

2627

2728
SERVICE_TABLE = "itx-groupsio-v2-service"
@@ -97,11 +98,6 @@ async def reindex_entry(self, table_name: str, item_id: str):
9798
try:
9899
assert self.kv is not None, "NATS KV not connected"
99100
entry = await self.kv.get(kv_key)
100-
if entry is None:
101-
print(f" MISSING: {kv_key}")
102-
s.missing += 1
103-
s.missing_keys.append(item_id)
104-
return
105101

106102
if not self.dry_run:
107103
await self.kv.put(kv_key, entry.value)
@@ -110,9 +106,12 @@ async def reindex_entry(self, table_name: str, item_id: str):
110106
else:
111107
print(f" found: {kv_key}")
112108

109+
except KeyNotFoundError:
110+
print(f" MISSING: {kv_key}")
111+
s.missing += 1
112+
s.missing_keys.append(item_id)
113113
except Exception as e:
114114
print(f" ERROR {kv_key}: {e}")
115-
s.missing += 1
116115
s.errors += 1
117116

118117
async def run(self, project_id: str):

scripts/reindexing/reindex_past_meetings.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from urllib.parse import urlparse
2727

2828
from nats.aio.client import Client as NATS
29+
from nats.js.errors import KeyNotFoundError
2930
from opensearchpy import OpenSearch
3031

3132

@@ -145,11 +146,6 @@ async def reindex_entry(self, meeting_and_occurrence_id: str):
145146
try:
146147
assert self.kv is not None, "NATS KV not connected"
147148
entry = await self.kv.get(kv_key)
148-
if entry is None:
149-
print(f" MISSING: {kv_key}")
150-
self.stats.missing += 1
151-
self.stats.missing_keys.append(meeting_and_occurrence_id)
152-
return
153149

154150
if not self.dry_run:
155151
await self.kv.put(kv_key, entry.value)
@@ -158,9 +154,12 @@ async def reindex_entry(self, meeting_and_occurrence_id: str):
158154
else:
159155
print(f" found: {kv_key}")
160156

157+
except KeyNotFoundError:
158+
print(f" MISSING: {kv_key}")
159+
self.stats.missing += 1
160+
self.stats.missing_keys.append(meeting_and_occurrence_id)
161161
except Exception as e:
162162
print(f" ERROR {kv_key}: {e}")
163-
self.stats.missing += 1
164163
self.stats.errors += 1
165164

166165
async def run(self):

scripts/reindexing/reindex_votes.py

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import httpx
2222
from nats.aio.client import Client as NATS
23+
from nats.js.errors import KeyNotFoundError
2324

2425

2526
OPENSEARCH_URL = "http://localhost:9200"
@@ -38,36 +39,54 @@ class Stats:
3839

3940

4041
def fetch_all_by_type(client: httpx.Client, object_type: str) -> List[str]:
41-
"""Fetch all object_ids of the given type from OpenSearch using pagination."""
42-
ids = []
43-
offset = 0
42+
"""Fetch all object_ids of the given type from OpenSearch using scroll pagination."""
43+
ids: List[str] = []
44+
scroll_id = None
4445

45-
while True:
46+
try:
4647
response = client.post(
4748
f"{OPENSEARCH_URL}/{INDEX}/_search",
49+
params={"scroll": "1m"},
4850
json={
4951
"query": {"term": {"object_type": object_type}},
5052
"size": PAGE_SIZE,
51-
"from": offset,
5253
"_source": ["object_id"],
54+
"sort": ["_doc"],
5355
},
5456
)
5557
response.raise_for_status()
5658
data = response.json()
5759

58-
hits = data["hits"]["hits"]
59-
if not hits:
60-
break
60+
scroll_id = data.get("_scroll_id")
61+
hits = data.get("hits", {}).get("hits", [])
62+
63+
while hits:
64+
for hit in hits:
65+
oid = (hit.get("_source") or {}).get("object_id")
66+
if oid:
67+
ids.append(oid)
68+
69+
if not scroll_id:
70+
break
6171

62-
for hit in hits:
63-
oid = hit["_source"].get("object_id")
64-
if oid:
65-
ids.append(oid)
72+
response = client.post(
73+
f"{OPENSEARCH_URL}/_search/scroll",
74+
json={"scroll": "1m", "scroll_id": scroll_id},
75+
)
76+
response.raise_for_status()
77+
data = response.json()
6678

67-
total = data["hits"]["total"]["value"]
68-
offset += len(hits)
69-
if offset >= total:
70-
break
79+
scroll_id = data.get("_scroll_id")
80+
hits = data.get("hits", {}).get("hits", [])
81+
finally:
82+
if scroll_id:
83+
try:
84+
client.delete(
85+
f"{OPENSEARCH_URL}/_search/scroll",
86+
json={"scroll_id": [scroll_id]},
87+
)
88+
except Exception:
89+
pass
7190

7291
return ids
7392

@@ -79,11 +98,6 @@ async def reindex_entry(kv, kv_prefix: str, item_id: str, dry_run: bool, stats:
7998

8099
try:
81100
entry = await kv.get(kv_key)
82-
if entry is None:
83-
print(f" MISSING: {kv_key}")
84-
stats.missing += 1
85-
stats.missing_keys.append(item_id)
86-
return
87101

88102
if not dry_run:
89103
await kv.put(kv_key, entry.value)
@@ -92,9 +106,12 @@ async def reindex_entry(kv, kv_prefix: str, item_id: str, dry_run: bool, stats:
92106
else:
93107
print(f" found: {kv_key}")
94108

109+
except KeyNotFoundError:
110+
print(f" MISSING: {kv_key}")
111+
stats.missing += 1
112+
stats.missing_keys.append(item_id)
95113
except Exception as e:
96114
print(f" ERROR {kv_key}: {e}")
97-
stats.missing += 1
98115
stats.errors += 1
99116

100117

0 commit comments

Comments
 (0)