Skip to content

Commit 618d2e8

Browse files
authored
Issue 664 cloud native aws (#666)
* feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664) - Event-driven SQS telemetry: S3 notifications to SQS, near-instant updates - Buffered S3 persistence: SpooledTemporaryFile fixes seek errors on large files - Native BedrockAction and BedrockStreamingAction for Bedrock integration - Terraform module: S3, SQS, IAM with dev/prod tfvars and tutorial * feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664) * feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664) * BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664) fixed build error * chore: move Bedrock integration to separate PR and review comments fixed * feat: add Bedrock integration (BIP-0042) as separate PR * Revert "feat: add Bedrock integration (BIP-0042) as separate PR" This reverts commit 61673cd. * fix: add tracking-server-s3 to CI deps, remove Bedrock (in separate PR), fix typo * fix: lint S3/run.py, SQS multi-record parse, BIP-0042 tests and CI job --------- Co-authored-by: vaquarkhan <vaquarkhan@users.noreply.github.com>
1 parent a7b856e commit 618d2e8

File tree

24 files changed

+1604
-21
lines changed

24 files changed

+1604
-21
lines changed

.github/workflows/python-package.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,29 @@ jobs:
7575
run: |
7676
python -m pytest tests --ignore=tests/integrations/persisters
7777
78+
test-tracking-server-s3:
79+
runs-on: ubuntu-latest
80+
strategy:
81+
fail-fast: false
82+
matrix:
83+
python-version: ['3.9', '3.10', '3.11', '3.12']
84+
85+
steps:
86+
- uses: actions/checkout@v4
87+
88+
- name: Set up Python ${{ matrix.python-version }}
89+
uses: actions/setup-python@v4
90+
with:
91+
python-version: ${{ matrix.python-version }}
92+
93+
- name: Install dependencies
94+
run: |
95+
python -m pip install -e ".[tests,tracking-client,tracking-server-s3]"
96+
97+
- name: Run S3 tracking server tests
98+
run: |
99+
python -m pytest tests/tracking/test_bip0042_s3_buffering.py -v
100+
78101
validate-examples:
79102
runs-on: ubuntu-latest
80103
steps:

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,11 @@ burr/tracking/server/build
193193
examples/*/statemachine
194194
examples/*/*/statemachine
195195
.vscode
196+
197+
# Terraform (see also examples/deployment/aws/terraform/.gitignore)
198+
**/.terraform.lock.hcl
199+
examples/deployment/aws/terraform/.terraform/
200+
examples/deployment/aws/terraform/*.tfstate
201+
examples/deployment/aws/terraform/*.tfstate.*
202+
examples/deployment/aws/terraform/.terraform.tfstate.lock.info
203+
examples/deployment/aws/terraform/*.tfplan

burr/tracking/server/backend.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,31 @@ def snapshot_interval_milliseconds(self) -> Optional[int]:
162162
pass
163163

164164

165+
class EventDrivenBackendMixin(abc.ABC):
166+
"""Mixin for backends that support event-driven updates.
167+
168+
Enables backends to receive real-time notifications instead of polling
169+
for new files.
170+
"""
171+
172+
@abc.abstractmethod
173+
async def start_event_consumer(self):
174+
"""Start the event consumer for event-driven tracking.
175+
176+
This method should run indefinitely, processing event notifications
177+
from the configured message queue.
178+
"""
179+
pass
180+
181+
@abc.abstractmethod
182+
def is_event_driven(self) -> bool:
183+
"""Check if this backend is configured for event-driven updates.
184+
185+
:return: True if event-driven mode is enabled and configured, False otherwise
186+
"""
187+
pass
188+
189+
165190
class BackendBase(abc.ABC):
166191
async def lifespan(self, app: FastAPI):
167192
"""Quick tool to allow plugin to the app's lifecycle.

burr/tracking/server/run.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import asyncio
1819
import importlib
1920
import logging
2021
import os
@@ -29,6 +30,7 @@
2930
from burr.tracking.server.backend import (
3031
AnnotationsBackendMixin,
3132
BackendBase,
33+
EventDrivenBackendMixin,
3234
IndexingBackendMixin,
3335
SnapshottingBackendMixin,
3436
)
@@ -134,9 +136,20 @@ async def lifespan(app: FastAPI):
134136
await backend.lifespan(app).__anext__()
135137
await sync_index() # this will trigger the repeat every N seconds
136138
await save_snapshot() # this will trigger the repeat every N seconds
139+
# Start event consumer for event-driven tracking when configured
140+
event_consumer_task = None
141+
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
142+
event_consumer_task = asyncio.create_task(backend.start_event_consumer())
137143
global initialized
138144
initialized = True
139145
yield
146+
# Graceful shutdown: cancel event consumer task
147+
if event_consumer_task is not None:
148+
event_consumer_task.cancel()
149+
try:
150+
await event_consumer_task
151+
except asyncio.CancelledError:
152+
pass
140153
await backend.lifespan(app).__anext__()
141154

142155

@@ -159,17 +172,16 @@ def _get_app_spec() -> BackendSpec:
159172
logger = logging.getLogger(__name__)
160173

161174
if app_spec.indexing:
162-
update_interval = backend.update_interval_milliseconds() / 1000 if app_spec.indexing else None
163-
sync_index = repeat_every(
164-
seconds=backend.update_interval_milliseconds() / 1000,
165-
wait_first=True,
166-
logger=logger,
167-
)(sync_index)
175+
# Only use polling when not in event-driven mode
176+
_event_driven = isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven()
177+
if not _event_driven:
178+
sync_index = repeat_every(
179+
seconds=backend.update_interval_milliseconds() / 1000,
180+
wait_first=True,
181+
logger=logger,
182+
)(sync_index)
168183

169184
if app_spec.snapshotting:
170-
snapshot_interval = (
171-
backend.snapshot_interval_milliseconds() / 1000 if app_spec.snapshotting else None
172-
)
173185
save_snapshot = repeat_every(
174186
seconds=backend.snapshot_interval_milliseconds() / 1000,
175187
wait_first=True,

burr/tracking/server/s3/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ This will immediately start indexing your s3 bucket (or pick up from the last sn
5959

6060
To track your data, you use the S3TrackingClient. You pass the tracker to the `ApplicationBuilder`:
6161

62-
6362
```python
63+
from burr.tracking.s3client import S3TrackingClient
64+
6465
app = (
6566
ApplicationBuilder()
6667
.with_graph(graph)

0 commit comments

Comments
 (0)