Skip to content

Bedrock integration backup#677

Open
vaquarkhan wants to merge 6 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup
Open

Bedrock integration backup#677
vaquarkhan wants to merge 6 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup

Conversation

@vaquarkhan
Copy link
Copy Markdown
Contributor

@vaquarkhan vaquarkhan commented Mar 16, 2026

Bedrock integration for Burr

Adds first-class support for Amazon Bedrock's Converse API to Burr, so users can build state machines and agents on top of Bedrock models (Claude, Titan, Llama, etc.) without writing boto3 plumbing themselves.

Architecture

The integration is structured in three layers: a pure "core" that knows how to talk to Bedrock, a thin mixin that adapts it to Burr's action interface, and two concrete action classes that plug into Burr's state machine.

Layer 1 _BedrockCore (transport + request building). This is a plain Python class with no knowledge of Burr actions. It holds the configuration (model id, region, guardrail, inference config, retry count, optional injected client) and exposes two things: get_client(), which returns the bedrock-runtime boto3 client (either the one injected at construction time, or a new one built lazily with a Config(retries=...) on first use), and build_converse_request(state), which calls the user's input_mapper(state) and assembles the kwargs dict passed to converse() / converse_stream() — modelId, messages, inferenceConfig, and the optional system and guardrailConfig blocks. Guardrail validation lives here: if guardrail_id is set without an explicit guardrail_version, the constructor raises ValueError so nobody accidentally ships against an unpublished DRAFT.

Layer 2 - _BedrockBase (Burr adapter mixin). A small mixin that holds a _BedrockCore instance and exposes the three properties Burr actions need: reads, writes, and name. It exists purely to avoid duplicating constructor wiring and property boilerplate between the sync and streaming classes.

Layer 3 - concrete actions.

BedrockAction inherits from _BedrockBase and Burr's SingleStepAction. Its run_and_update() builds the Converse request, calls converse(), runs the response through _text_from_content_blocks() (which joins every text block in the response, so multi-block replies and mixed text/tool-use messages aren't silently truncated), and then uses _model_result_for_writes() to map the model output to whichever keys the user declared in writes — "response" gets the joined text, "usage" gets the token counts, "stop_reason" gets the Bedrock stop reason, and any other custom key also gets the joined text.
BedrockStreamingAction inherits from _BedrockBase and Burr's StreamingAction. Its stream_run() is a generator that iterates the Bedrock event stream, collecting text chunks into a list[str] (joined once at the end — not concatenated in a loop), and yields one dict per contentBlockDelta event plus a final complete: True payload. Its update() runs only on the final chunk and reuses the same _model_result_for_writes() helper, so streaming and non-streaming actions write state using identical logic.

Helper functions. Two pure functions sit at module scope and are shared by both actions: _text_from_content_blocks(blocks) tolerates non-text blocks (e.g. toolUse) and returns the joined text, and _model_result_for_writes(text, usage, stop_reason, writes) centralises the "which result goes into which state key" logic so the sync and streaming paths can't drift out of sync.

Cross-cutting concerns.

boto3 is imported inside a try/except that calls require_plugin(...) on failure, so installing Burr without the [bedrock] extra doesn't break anything until a user actually touches the module.
The BedrockAction / BedrockStreamingAction symbols are exposed via a getattr in
init.py
, giving callers the ergonomic from burr.integrations import BedrockAction without importing boto3 for every Burr user.
Errors from boto3 (ClientError) are logged at ERROR level and re-raised unchanged, so Burr's own retry/lifecycle hooks can handle them at the application level.
State machine shape. Both actions are drop-in nodes in a Burr graph. A typical chatbot wiring is human_input → bedrock_call → save_response → human_input, where bedrock_call is one of these two actions reading chat_history and writing response. For agents, bedrock_call is followed by a conditional transition into a tool-executor action and back again — the integration itself stays stateless across calls; all conversation and tool state lives in Burr's State.

Changes

  • burr/integrations/bedrock.py
    • BedrockAction — single-step action wrapping Bedrock converse(). Handles lazy client creation, retries, guardrails, inference config, and maps model output into state.
    • BedrockStreamingAction — streaming variant using converse_stream(), yields chunks incrementally and merges final text into state on completion.
    • Shared _BedrockBase + _BedrockCore to avoid duplication across the two classes.
    • Helpers _text_from_content_blocks (joins all text blocks, tolerates tool-use blocks) and _model_result_for_writes (dynamically maps user's writes keys to response fields).
  • burr/integrations/__init__.py — lazy exports for BedrockAction / BedrockStreamingAction so boto3 is only imported when the classes are actually used.
  • pyproject.toml — adds [bedrock] optional extra (boto3).
  • docs/reference/integrations/bedrock.rst — Sphinx reference docs with install and IAM setup notes.
  • docs/getting_started/install.rst — adds Bedrock to the install matrix.
  • examples/integrations/bedrock/ — runnable example with application() (sync) and streaming_application() (streaming) functions, plus README and requirements.
  • .github/workflows/python-package.yml — dedicated test-bedrock job so AWS deps stay out of the default test matrix.
  • tests/integrations/test_bip0042_bedrock.py — 23 unit tests covering imports, guardrail validation, sync/streaming interfaces, multi-block responses, custom writes keys, and error propagation.

Design notes

  • No boto3 import at module load — guarded by require_plugin, so installs without the [bedrock] extra still work.
  • Client injection — both actions accept a pre-built bedrock-runtime client for tests and distributed execution; otherwise the client is created lazily on first use.
  • Guardrail safety — setting guardrail_id without an explicit guardrail_version raises ValueError rather than silently defaulting to DRAFT.
  • Empty vs. default inference configinference_config={} is respected; None falls back to {"maxTokens": 4096}.
  • Custom writes keys — both sync and streaming actions map the model's text output to whichever writes key the user declares (e.g. writes=["answer"]), with "response" kept for backwards compatibility.

How I tested this

  • Ran the full unit suite: 23/23 bedrock tests pass, broader Burr suite unchanged.
  • Ran the example against real Bedrock (Claude 3 Haiku, us-east-1) — both sync and streaming return valid output.
  • Exercised end-to-end scenarios: multi-turn chat with state-managed history, custom writes keys, multi-content-block responses, guardrail validation paths, and ClientError propagation.

Notes

  • Follow-ups I'd suggest as separate PRs:
    • Async variants (AsyncBedrockAction / AsyncBedrockStreamingAction) on top of AsyncStreamingAction.
    • Native toolConfig support in BedrockAction for building agents without going around the abstraction.
    • stream_result with typed state via pydantic integration.

Checklist

  • PR has an informative and human-readable title
  • Changes are limited to a single goal (Bedrock integration only — tracking/SQS work split out per earlier review)
  • Code passed pre-commit
  • New functionality is tested (23 unit tests + real-API smoke)
  • New functions are documented (docstrings + Sphinx reference)
  • Project documentation updated (docs/reference/integrations/bedrock.rst, docs/getting_started/install.rst, example README)

@vaquarkhan vaquarkhan force-pushed the bedrock-integration-backup branch from 995a1ba to f593ee3 Compare March 16, 2026 08:28
Copy link
Copy Markdown
Collaborator

@andreahlert andreahlert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two features in one PR, would've been easier to review split. Bedrock side is straightforward, the SQS consumer needs work.

if s3_key and s3_key.endswith(".jsonl"):
await self._handle_s3_event(s3_key, event_time)

await sqs_client.delete_message(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s3_key = None and event_time = None are dead. They're assigned here but never read, the actual values come from s3_keys_with_times on line 855.


async def indexing_jobs(
self, offset: int = 0, limit: int = 100, filter_empty: bool = True
) -> Sequence[schema.IndexingJob]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocker: if _handle_s3_event raises on the Nth record of a multi-record SQS message, records 0..N-1 get re-indexed on retry with no dedup. Make _handle_s3_event idempotent (check if s3_path exists in LogFile before insert).

Comment thread burr/integrations/bedrock.py Outdated
self._name = name
self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guardrail_version or "DRAFT" silently defaults to DRAFT when only guardrail_id is set. Is that intentional? Feels risky for prod, someone could set a guardrail ID and not realize they're running against an unpublished draft. Same at line 191.

Comment thread burr/integrations/bedrock.py Outdated
return result, new_state


class BedrockStreamingAction(StreamingAction):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO BedrockStreamingAction is a copy-paste of BedrockAction. __init__, _get_client, reads/writes/name properties are all identical, ~70 lines duplicated. Pull them into a _BedrockBase and let each subclass just implement its execution method.

Comment thread burr/integrations/bedrock.py Outdated
self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
self._inference_config = inference_config or {"maxTokens": 4096}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a small nit: inference_config or {"maxTokens": 4096} means inference_config={} gives you the default because empty dict is falsy. Use if inference_config is not None if you want to allow empty configs.

Comment thread .github/workflows/python-package.yml Outdated
- name: Install dependencies
run: |
python -m pip install -e ".[tests,tracking-client,graphviz]"
python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking-server-s3 in the main CI install and apache-burr[bedrock] in [tests] (pyproject.toml:101) means every contributor now pulls boto3, aiobotocore, tortoise-orm, aerich even for unrelated PRs. Keep AWS deps in a separate CI job and test group, like the existing test-persister-dbs pattern.

"""
if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url:
logger.info("Event consumer not configured, skipping")
return
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the data/{project}/... prefix structure guaranteed for all event sources? WDYT about getting the project name from data_file.prefix (you already parse it via DataFile.from_path one line above) instead of a raw split?

response = await sqs_client.receive_message(
QueueUrl=self._sqs_queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=self._sqs_wait_time_seconds,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, auto-creating the project from the S3 event is a good call for the event-driven flow.

})
}

resource "aws_sqs_queue_redrive_policy" "main" {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the DLQ + redrive policy setup, clean separation into modules.

@skrawcz skrawcz mentioned this pull request Mar 28, 2026
7 tasks
@andreahlert andreahlert added kind/feature Net-new functionality area/integrations External integrations (LLMs, frameworks) labels Mar 30, 2026
@github-actions github-actions bot added area/storage Persisters, state storage area/tracking Telemetry, tracing, OpenTelemetry area/ci Workflows, build, release scripts area/examples Relates to /examples pr/needs-rebase Conflicts with main and removed pr/needs-rebase Conflicts with main labels Apr 11, 2026
@vaquarkhan vaquarkhan force-pushed the bedrock-integration-backup branch from 4c66bf7 to 0499dec Compare April 12, 2026 04:48
@github-actions github-actions bot added the area/website burr.apache.org website label Apr 12, 2026
@vaquarkhan
Copy link
Copy Markdown
Contributor Author

PR ready for review , also The documentation GitHub Actions workflow reports “No jobs were run” / failed at startup on commit .
https://github.com/apache/burr/actions/runs/24301015848

Could a committer confirm whether workflow approval is needed for fork PRs, or if there’s an infra issue with that workflow?

@vaquarkhan
Copy link
Copy Markdown
Contributor Author

@andreahlert @skrawcz plz review bedrock changes we moved out to reduce scope of previous PR

Copy link
Copy Markdown
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good. Can we get an example under /examples using the two actions? (and also fix the pre-commit please)

Apply Black formatting to burr/integrations/bedrock.py for CI.

Add examples/integrations/bedrock with BedrockAction and BedrockStreamingAction, requirements and README, and list it in examples/README.
end-of-file-fixer expects a trailing newline on burr/examples so
pre-commit run --all-files passes.
@vaquarkhan
Copy link
Copy Markdown
Contributor Author

@skrawcz @andreahlert this is ready for another look.

  • BedrockAction / BedrockStreamingAction refactored behind a shared _BedrockBase + _BedrockCore — no more duplication across sync and streaming.
  • Fixed streaming update() to respect custom writes keys (matches sync parity).
  • Multi-content-block responses are joined instead of dropping everything after the first block.
  • Streaming path uses list + "".join instead of O(n²) string concat.
  • Guardrail validation is explicit — no silent DRAFT default.
  • Runnable example under examples/integrations/bedrock/ with sync + streaming demos, README, requirements.
  • Added opt-in live Bedrock test suite at

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ci Workflows, build, release scripts area/examples Relates to /examples area/integrations External integrations (LLMs, frameworks) area/storage Persisters, state storage area/tracking Telemetry, tracing, OpenTelemetry area/website burr.apache.org website kind/feature Net-new functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants