Skip to content

Commit 906f30f

Browse files
committed
fix: lint S3/run.py, SQS multi-record parse, BIP-0042 tests and CI job
1 parent 5ca9739 commit 906f30f

4 files changed

Lines changed: 170 additions & 90 deletions

File tree

.github/workflows/python-package.yml

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,35 @@ jobs:
6969
7070
- name: Install dependencies
7171
run: |
72-
python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]"
72+
python -m pip install -e ".[tests,tracking-client,graphviz]"
7373
7474
- name: Run tests
7575
run: |
7676
python -m pytest tests --ignore=tests/integrations/persisters
7777
78+
test-tracking-server-s3:
79+
runs-on: ubuntu-latest
80+
strategy:
81+
fail-fast: false
82+
matrix:
83+
python-version: ['3.9', '3.10', '3.11', '3.12']
84+
85+
steps:
86+
- uses: actions/checkout@v4
87+
88+
- name: Set up Python ${{ matrix.python-version }}
89+
uses: actions/setup-python@v4
90+
with:
91+
python-version: ${{ matrix.python-version }}
92+
93+
- name: Install dependencies
94+
run: |
95+
python -m pip install -e ".[tests,tracking-client,tracking-server-s3]"
96+
97+
- name: Run S3 tracking server tests
98+
run: |
99+
python -m pytest tests/tracking/test_bip0042_s3_buffering.py -v
100+
78101
validate-examples:
79102
runs-on: ubuntu-latest
80103
steps:

burr/tracking/server/run.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,15 @@ def get_app_spec():
186186

187187
if app_spec.indexing:
188188
# Only use polling when not in event-driven mode
189-
if not (
190-
isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven()
191-
):
192-
update_interval = (
193-
backend.update_interval_milliseconds() / 1000 if app_spec.indexing else None
194-
)
189+
_event_driven = isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven()
190+
if not _event_driven:
195191
sync_index = repeat_every(
196192
seconds=backend.update_interval_milliseconds() / 1000,
197193
wait_first=True,
198194
logger=logger,
199195
)(sync_index)
200196

201197
if app_spec.snapshotting:
202-
snapshot_interval = (
203-
backend.snapshot_interval_milliseconds() / 1000 if app_spec.snapshotting else None
204-
)
205198
save_snapshot = repeat_every(
206199
seconds=backend.snapshot_interval_milliseconds() / 1000,
207200
wait_first=True,

burr/tracking/server/s3/backend.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import fastapi
3434
import pydantic
3535
from aiobotocore import session
36-
from pydantic import field_validator
3736
from fastapi import FastAPI
37+
from pydantic import field_validator
3838
from pydantic_settings import BaseSettings
3939
from tortoise import functions, transactions
4040
from tortoise.contrib.fastapi import RegisterTortoise
@@ -205,7 +205,40 @@ def timestamp_to_reverse_alphabetical(timestamp: datetime) -> str:
205205
return inverted_str + "-" + timestamp.isoformat()
206206

207207

208-
class SQLiteS3Backend(BackendBase, IndexingBackendMixin, SnapshottingBackendMixin, EventDrivenBackendMixin):
208+
def _parse_sqs_message_events(
209+
body: dict,
210+
) -> Optional[List[Tuple[str, datetime.datetime]]]:
211+
"""Parse EventBridge-wrapped or native S3 notification bodies from SQS.
212+
213+
Returns None if the format is not recognized. Multiple S3 records in one
214+
message yield one tuple per record.
215+
"""
216+
if "detail" in body:
217+
return [
218+
(
219+
body["detail"]["object"]["key"],
220+
datetime.datetime.fromisoformat(body["time"].replace("Z", "+00:00")),
221+
)
222+
]
223+
if "Records" in body:
224+
out: List[Tuple[str, datetime.datetime]] = []
225+
for record in body["Records"]:
226+
out.append(
227+
(
228+
record["s3"]["object"]["key"],
229+
datetime.datetime.fromisoformat(record["eventTime"].replace("Z", "+00:00")),
230+
)
231+
)
232+
return out
233+
return None
234+
235+
236+
class SQLiteS3Backend(
237+
BackendBase,
238+
IndexingBackendMixin,
239+
SnapshottingBackendMixin,
240+
EventDrivenBackendMixin,
241+
):
209242
def __init__(
210243
self,
211244
s3_bucket: str,
@@ -790,27 +823,14 @@ async def start_event_consumer(self) -> None:
790823
for message in messages:
791824
try:
792825
body = json.loads(message["Body"])
793-
s3_key = None
794-
event_time = None
795-
796-
# Handle EventBridge wrapped S3 events
797-
if "detail" in body:
798-
s3_key = body["detail"]["object"]["key"]
799-
event_time = datetime.datetime.fromisoformat(
800-
body["time"].replace("Z", "+00:00")
801-
)
802-
elif "Records" in body:
803-
record = body["Records"][0]
804-
s3_key = record["s3"]["object"]["key"]
805-
event_time = datetime.datetime.fromisoformat(
806-
record["eventTime"].replace("Z", "+00:00")
807-
)
808-
else:
809-
logger.warning(f"Unknown message format: {body}")
826+
events = _parse_sqs_message_events(body)
827+
if events is None:
828+
logger.warning("Unknown message format: %s", body)
810829
continue
811830

812-
if s3_key and s3_key.endswith(".jsonl"):
813-
await self._handle_s3_event(s3_key, event_time)
831+
for s3_key, event_time in events:
832+
if s3_key and s3_key.endswith(".jsonl"):
833+
await self._handle_s3_event(s3_key, event_time)
814834

815835
await sqs_client.delete_message(
816836
QueueUrl=self._sqs_queue_url,
@@ -865,7 +885,6 @@ async def indexing_jobs(
865885

866886
if __name__ == "__main__":
867887
os.environ["BURR_LOAD_SNAPSHOT_ON_START"] = "True"
868-
import asyncio
869888

870889
be = SQLiteS3Backend.from_settings(S3Settings())
871890
# coro = be.snapshot() # save to s3

0 commit comments

Comments
 (0)