Conversation
995a1ba to
f593ee3
Compare
| if s3_key and s3_key.endswith(".jsonl"): | ||
| await self._handle_s3_event(s3_key, event_time) | ||
|
|
||
| await sqs_client.delete_message( |
There was a problem hiding this comment.
nit: s3_key = None and event_time = None are dead. They're assigned here but never read, the actual values come from s3_keys_with_times on line 855.
|
|
||
| async def indexing_jobs( | ||
| self, offset: int = 0, limit: int = 100, filter_empty: bool = True | ||
| ) -> Sequence[schema.IndexingJob]: |
There was a problem hiding this comment.
blocker: if _handle_s3_event raises on the Nth record of a multi-record SQS message, records 0..N-1 get re-indexed on retry with no dedup. Make _handle_s3_event idempotent (check if s3_path exists in LogFile before insert).
| self._name = name | ||
| self._region = region | ||
| self._guardrail_id = guardrail_id | ||
| self._guardrail_version = guardrail_version or "DRAFT" |
There was a problem hiding this comment.
guardrail_version or "DRAFT" silently defaults to DRAFT when only guardrail_id is set. Is that intentional? Feels risky for prod, someone could set a guardrail ID and not realize they're running against an unpublished draft. Same at line 191.
| return result, new_state | ||
|
|
||
|
|
||
| class BedrockStreamingAction(StreamingAction): |
There was a problem hiding this comment.
IMHO BedrockStreamingAction is a copy-paste of BedrockAction. __init__, _get_client, reads/writes/name properties are all identical, ~70 lines duplicated. Pull them into a _BedrockBase and let each subclass just implement its execution method.
| self._region = region | ||
| self._guardrail_id = guardrail_id | ||
| self._guardrail_version = guardrail_version or "DRAFT" | ||
| self._inference_config = inference_config or {"maxTokens": 4096} |
There was a problem hiding this comment.
just a small nit: inference_config or {"maxTokens": 4096} means inference_config={} gives you the default because empty dict is falsy. Use if inference_config is not None if you want to allow empty configs.
| - name: Install dependencies | ||
| run: | | ||
| python -m pip install -e ".[tests,tracking-client,graphviz]" | ||
| python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]" |
There was a problem hiding this comment.
tracking-server-s3 in the main CI install and apache-burr[bedrock] in [tests] (pyproject.toml:101) means every contributor now pulls boto3, aiobotocore, tortoise-orm, aerich even for unrelated PRs. Keep AWS deps in a separate CI job and test group, like the existing test-persister-dbs pattern.
| """ | ||
| if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url: | ||
| logger.info("Event consumer not configured, skipping") | ||
| return |
There was a problem hiding this comment.
Is the data/{project}/... prefix structure guaranteed for all event sources? WDYT about getting the project name from data_file.prefix (you already parse it via DataFile.from_path one line above) instead of a raw split?
| response = await sqs_client.receive_message( | ||
| QueueUrl=self._sqs_queue_url, | ||
| MaxNumberOfMessages=10, | ||
| WaitTimeSeconds=self._sqs_wait_time_seconds, |
There was a problem hiding this comment.
Nice, auto-creating the project from the S3 event is a good call for the event-driven flow.
| }) | ||
| } | ||
|
|
||
| resource "aws_sqs_queue_redrive_policy" "main" { |
There was a problem hiding this comment.
+1 on the DLQ + redrive policy setup, clean separation into modules.
4c66bf7 to
0499dec
Compare
|
PR ready for review , also The documentation GitHub Actions workflow reports “No jobs were run” / failed at startup on commit . Could a committer confirm whether workflow approval is needed for fork PRs, or if there’s an infra issue with that workflow? |
|
@andreahlert @skrawcz plz review bedrock changes we moved out to reduce scope of previous PR |
Apply Black formatting to burr/integrations/bedrock.py for CI. Add examples/integrations/bedrock with BedrockAction and BedrockStreamingAction, requirements and README, and list it in examples/README.
end-of-file-fixer expects a trailing newline on burr/examples so pre-commit run --all-files passes.
Made-with: Cursor
|
@skrawcz @andreahlert this is ready for another look.
|
Bedrock integration for Burr
Adds first-class support for Amazon Bedrock's Converse API to Burr, so users can build state machines and agents on top of Bedrock models (Claude, Titan, Llama, etc.) without writing boto3 plumbing themselves.
Architecture
The integration is structured in three layers: a pure "core" that knows how to talk to Bedrock, a thin mixin that adapts it to Burr's action interface, and two concrete action classes that plug into Burr's state machine.
Layer 1 _BedrockCore (transport + request building). This is a plain Python class with no knowledge of Burr actions. It holds the configuration (model id, region, guardrail, inference config, retry count, optional injected client) and exposes two things: get_client(), which returns the bedrock-runtime boto3 client (either the one injected at construction time, or a new one built lazily with a Config(retries=...) on first use), and build_converse_request(state), which calls the user's input_mapper(state) and assembles the kwargs dict passed to converse() / converse_stream() — modelId, messages, inferenceConfig, and the optional system and guardrailConfig blocks. Guardrail validation lives here: if guardrail_id is set without an explicit guardrail_version, the constructor raises ValueError so nobody accidentally ships against an unpublished DRAFT.
Layer 2 - _BedrockBase (Burr adapter mixin). A small mixin that holds a _BedrockCore instance and exposes the three properties Burr actions need: reads, writes, and name. It exists purely to avoid duplicating constructor wiring and property boilerplate between the sync and streaming classes.
Layer 3 - concrete actions.
BedrockAction inherits from _BedrockBase and Burr's SingleStepAction. Its run_and_update() builds the Converse request, calls converse(), runs the response through _text_from_content_blocks() (which joins every text block in the response, so multi-block replies and mixed text/tool-use messages aren't silently truncated), and then uses _model_result_for_writes() to map the model output to whichever keys the user declared in writes — "response" gets the joined text, "usage" gets the token counts, "stop_reason" gets the Bedrock stop reason, and any other custom key also gets the joined text.
BedrockStreamingAction inherits from _BedrockBase and Burr's StreamingAction. Its stream_run() is a generator that iterates the Bedrock event stream, collecting text chunks into a list[str] (joined once at the end — not concatenated in a loop), and yields one dict per contentBlockDelta event plus a final complete: True payload. Its update() runs only on the final chunk and reuses the same _model_result_for_writes() helper, so streaming and non-streaming actions write state using identical logic.
Helper functions. Two pure functions sit at module scope and are shared by both actions: _text_from_content_blocks(blocks) tolerates non-text blocks (e.g. toolUse) and returns the joined text, and _model_result_for_writes(text, usage, stop_reason, writes) centralises the "which result goes into which state key" logic so the sync and streaming paths can't drift out of sync.
Cross-cutting concerns.
boto3 is imported inside a try/except that calls require_plugin(...) on failure, so installing Burr without the [bedrock] extra doesn't break anything until a user actually touches the module.
The BedrockAction / BedrockStreamingAction symbols are exposed via a getattr in
init.py
, giving callers the ergonomic from burr.integrations import BedrockAction without importing boto3 for every Burr user.
Errors from boto3 (ClientError) are logged at ERROR level and re-raised unchanged, so Burr's own retry/lifecycle hooks can handle them at the application level.
State machine shape. Both actions are drop-in nodes in a Burr graph. A typical chatbot wiring is human_input → bedrock_call → save_response → human_input, where bedrock_call is one of these two actions reading chat_history and writing response. For agents, bedrock_call is followed by a conditional transition into a tool-executor action and back again — the integration itself stays stateless across calls; all conversation and tool state lives in Burr's State.
Changes
burr/integrations/bedrock.pyBedrockAction— single-step action wrapping Bedrockconverse(). Handles lazy client creation, retries, guardrails, inference config, and maps model output into state.BedrockStreamingAction— streaming variant usingconverse_stream(), yields chunks incrementally and merges final text into state on completion._BedrockBase+_BedrockCoreto avoid duplication across the two classes._text_from_content_blocks(joins all text blocks, tolerates tool-use blocks) and_model_result_for_writes(dynamically maps user'swriteskeys to response fields).burr/integrations/__init__.py— lazy exports forBedrockAction/BedrockStreamingActionsoboto3is only imported when the classes are actually used.pyproject.toml— adds[bedrock]optional extra (boto3).docs/reference/integrations/bedrock.rst— Sphinx reference docs with install and IAM setup notes.docs/getting_started/install.rst— adds Bedrock to the install matrix.examples/integrations/bedrock/— runnable example withapplication()(sync) andstreaming_application()(streaming) functions, plus README and requirements..github/workflows/python-package.yml— dedicatedtest-bedrockjob so AWS deps stay out of the default test matrix.tests/integrations/test_bip0042_bedrock.py— 23 unit tests covering imports, guardrail validation, sync/streaming interfaces, multi-block responses, customwriteskeys, and error propagation.Design notes
boto3import at module load — guarded byrequire_plugin, so installs without the[bedrock]extra still work.bedrock-runtimeclient for tests and distributed execution; otherwise the client is created lazily on first use.guardrail_idwithout an explicitguardrail_versionraisesValueErrorrather than silently defaulting toDRAFT.inference_config={}is respected;Nonefalls back to{"maxTokens": 4096}.writeskeys — both sync and streaming actions map the model's text output to whicheverwriteskey the user declares (e.g.writes=["answer"]), with"response"kept for backwards compatibility.How I tested this
us-east-1) — both sync and streaming return valid output.writeskeys, multi-content-block responses, guardrail validation paths, andClientErrorpropagation.Notes
AsyncBedrockAction/AsyncBedrockStreamingAction) on top ofAsyncStreamingAction.toolConfigsupport inBedrockActionfor building agents without going around the abstraction.stream_resultwith typed state viapydanticintegration.Checklist
docs/reference/integrations/bedrock.rst,docs/getting_started/install.rst, example README)