|
17 | 17 | - **Batching Chunks:** If rate limits are frequently hit, consider batching multiple chunks together and updating StreamIO less frequently. This reduces the number of API calls and better utilizes the allowed quota. |
18 | 18 | - **Skipping Redundant Updates:** In scenarios where only the latest chunk matters (e.g., replacing message content), it may be optimal to skip intermediate updates that failed due to rate limits and only update with the most recent chunk after the backoff period. |
19 | 19 | - **Inspect Rate Limit Headers:** Always inspect the `X-RateLimit-Remaining` and `X-RateLimit-Reset` headers in StreamIO responses to dynamically adjust backoff timing and avoid unnecessary retries. |
20 | | - - **Configurable Retry Policy:** In LangGraph, you can configure retry policies for nodes (see [How to add node retry policies](https://langchain-ai.github.io/langgraph/how-tos/node-retries/)), allowing for exponential backoff and custom retry logic on API errors like 429. |
21 | | - - ### Example Retry Policy in LangGraph |
22 | | - - Use the `RetryPolicy` when adding a node that updates StreamIO, specifying `initial_interval`, `backoff_factor`, `max_interval`, and `max_attempts`. |
23 | | - - Example: |
24 | | - ~~~python |
25 | | - from langgraph.pregel import RetryPolicy |
26 | | - builder.add_node( |
27 | | - "update_streamio", |
28 | | - update_streamio_fn, |
29 | | - retry=RetryPolicy(initial_interval=1.0, backoff_factor=2.0, max_interval=32.0, max_attempts=5) |
30 | | - ) |
31 | | - ~~~ |
32 | | - - This ensures that if a rate limit error occurs, the node will retry with exponential backoff, up to the specified maximum attempts. |
33 | 20 | - ### Summary |
34 | 21 | - When streaming from LangGraph to StreamIO, design your update logic to: |
35 | 22 | - Handle 429 errors with exponential backoff |
36 | 23 | - Consider batching or skipping redundant updates |
37 | | - - Use LangGraph's retry policies for robust error handling |
38 | 24 | - Monitor rate limit headers to optimize retry timing |
39 | 25 | - This approach balances responsiveness, efficiency, and compliance with StreamIO's rate limits. |
40 | 26 | - ## Algorithms |
|
91 | 77 | ``` |
92 | 78 | - Uses Stream's **partial-update** endpoint so you never overwrite undeclared fields ([Build an AI Assistant Using Python - getstream.io](https://getstream.io/blog/python-assistant/?utm_source=chatgpt.com)). |
93 | 79 | - Works with any LangGraph streaming mode; just adapt the buffer strategy for "replace" vs "append". |
94 | | - - ### Node-Level Retry Policy (optional) |
95 | | - - ```python |
96 | | - from langgraph.pregel import RetryPolicy |
97 | | - builder.add_node( |
98 | | - "update_streamio", |
99 | | - lambda state: stream_to_streamio(state["run_id"], state["msg_id"]), |
100 | | - retry=RetryPolicy(initial_interval=1.0, backoff_factor=2.0, |
101 | | - max_interval=32.0, max_attempts=5) |
102 | | - ) |
103 | | - |
104 | | - ``` |
105 | | - - This lets LangGraph itself re-invoke the node when a 429 bubbles up. ([Streaming](https://langchain-ai.github.io/langgraph/concepts/streaming/)) |
| 80 | + - ## Additional Context: Node-Level Retry Policies in LangGraph |
| 81 | + - **Configurable Retry Policy:** In LangGraph, you can configure retry policies for nodes (see [How to add node retry policies](https://langchain-ai.github.io/langgraph/how-tos/node-retries/)), allowing for exponential backoff and custom retry logic on API errors like 429. |
| 82 | + - **Note:** Node-level retry policies in LangGraph are only relevant if a LangGraph node is directly responsible for updating StreamIO. In the main scenario discussed above, streaming from LangGraph and updating StreamIO are decoupled, so node-level retry policies do not apply. If, however, you architect your graph such that a node performs the StreamIO update, you can use a retry policy as shown below. |
| 83 | + - ```python |
| 84 | + from langgraph.pregel import RetryPolicy |
| 85 | + builder.add_node( |
| 86 | + "update_streamio", |
| 87 | + lambda state: stream_to_streamio(state["run_id"], state["msg_id"]), |
| 88 | + retry=RetryPolicy(initial_interval=1.0, backoff_factor=2.0, |
| 89 | + max_interval=32.0, max_attempts=5) |
| 90 | + ) |
| 91 | + ``` |
| 92 | + - This lets LangGraph itself re-invoke the node when a 429 bubbles up. ([Streaming](https://langchain-ai.github.io/langgraph/concepts/streaming/)) |
0 commit comments