Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions temporal-spring-ai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ public class MyTools {

Auto-detected and executed as Nexus operations, similar to activity stubs.

## Media in messages

If you attach media (images, audio, etc.) to a `UserMessage` or an `AssistantMessage`, prefer passing it by URI rather than raw bytes:

```java
// Good — only the URL crosses the activity boundary.
Media image = new Media(MimeTypeUtils.IMAGE_PNG, URI.create("https://cdn.example.com/pic.png"));

// Works, but size-limited — see below.
Media image = new Media(MimeTypeUtils.IMAGE_PNG, new ByteArrayResource(bytes));
```

Raw `byte[]` media gets serialized into every chat activity's input *and* result payload, which end up inside Temporal workflow history events. Server-side history events have a fixed 2 MiB size limit; to leave headroom for messages, tool definitions, and options, the plugin enforces a **1 MiB default cap** on inline media bytes and fails fast with an `IllegalArgumentException` pointing you at the URI alternative.

Override the cap by setting the system property `io.temporal.springai.maxMediaBytes` before your worker starts (pass a positive integer; `0` disables the check). For anything larger than a small thumbnail, the URI route is the right answer — have an activity write the bytes to blob storage, then pass only the URL into the conversation.

## Optional Integrations

Auto-configured when their dependencies are on the classpath:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ private ChatModelTypes.MediaContent fromMedia(Media media) {
if (media.getData() instanceof String uri) {
return new ChatModelTypes.MediaContent(mimeType, uri);
} else if (media.getData() instanceof byte[] data) {
ChatModelTypes.checkMediaSize(data);
return new ChatModelTypes.MediaContent(mimeType, data);
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ private ChatModelTypes.MediaContent toMediaContent(Media media) {
if (media.getData() instanceof String uri) {
return new ChatModelTypes.MediaContent(mimeType, uri);
} else if (media.getData() instanceof byte[] data) {
ChatModelTypes.checkMediaSize(data);
return new ChatModelTypes.MediaContent(mimeType, data);
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.temporal.failure.ApplicationFailure;
import java.time.Duration;
import java.util.List;
import javax.annotation.Nullable;
Expand All @@ -16,6 +17,50 @@
*/
public final class ChatModelTypes {

/**
* Maximum size, in bytes, of a single {@link MediaContent#data()} byte array carried across the
* chat activity boundary. Bytes above this threshold land inside workflow history events, which
* have a fixed 2 MiB per-event limit on the Temporal server. 1 MiB leaves headroom for the rest
* of a chat payload (messages, tool definitions, options).
*
* <p>Users who want to raise or lower the cap can set the system property {@code
* io.temporal.springai.maxMediaBytes} to a positive integer before the chat activity runs; values
* &lt;= 0 disable the guard entirely. For most workloads, pass media by URI instead — write the
* bytes to a binary store from an activity, and pass only the URL across the conversation.
*/
public static final long MAX_MEDIA_BYTES_IN_HISTORY =
Long.getLong("io.temporal.springai.maxMediaBytes", 1L * 1024 * 1024);

/** Failure type on the {@link ApplicationFailure} thrown by {@link #checkMediaSize(byte[])}. */
public static final String MEDIA_SIZE_EXCEEDED_FAILURE_TYPE = "MediaSizeExceeded";

/**
* Throws a non-retryable {@link ApplicationFailure} if {@code data} exceeds {@link
* #MAX_MEDIA_BYTES_IN_HISTORY}. Non-retryable because this is a permanent, programmer-level error
* — retrying the same oversized payload will never succeed, and using a plain {@link
* RuntimeException} here would cause the workflow task to be retried forever (or the activity to
* churn through its {@code maxAttempts}) rather than surfacing the real problem. The failure
* message points the caller at the URI-based {@code Media} constructor. Pass-through otherwise.
*/
public static void checkMediaSize(byte[] data) {
if (data == null) {
return;
}
long limit = MAX_MEDIA_BYTES_IN_HISTORY;
if (limit > 0 && data.length > limit) {
throw ApplicationFailure.newNonRetryableFailure(
"Media byte[] is "
+ data.length
+ " bytes, which exceeds the "
+ limit
+ "-byte limit for inline media in Temporal workflow history. Pass the media by "
+ "URI instead: store the bytes outside the workflow (e.g. S3) and construct "
+ "Media(mimeType, URI). Set the system property "
+ "'io.temporal.springai.maxMediaBytes' to override this limit (or 0 to disable).",
MEDIA_SIZE_EXCEEDED_FAILURE_TYPE);
}
}

private ChatModelTypes() {}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package io.temporal.springai;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.springai.activity.ChatModelActivityImpl;
import io.temporal.springai.model.ActivityChatModel;
import io.temporal.springai.model.ChatModelTypes;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.net.URI;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.content.Media;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

/**
* Unit tests around {@link ChatModelTypes#checkMediaSize(byte[])} plus integration-style tests
* against a live TestWorkflowEnvironment to make sure the guard fires on both the inbound (workflow
* → activity) and outbound (activity → workflow) conversion paths.
*/
class MediaSizeGuardTest {

private static final String TASK_QUEUE = "test-spring-ai-media-size-guard";

private TestWorkflowEnvironment testEnv;
private WorkflowClient client;

@BeforeEach
void setUp() {
testEnv = TestWorkflowEnvironment.newInstance();
client = testEnv.getWorkflowClient();
}

@AfterEach
void tearDown() {
testEnv.close();
}

@Test
void checkMediaSize_smallPayload_passes() {
byte[] small = new byte[500 * 1024]; // 500 KiB, well under 1 MiB
assertDoesNotThrow(() -> ChatModelTypes.checkMediaSize(small));
}

@Test
void checkMediaSize_oversizedPayload_throwsNonRetryableApplicationFailure() {
byte[] big = new byte[(int) ChatModelTypes.MAX_MEDIA_BYTES_IN_HISTORY + 1];
ApplicationFailure ex =
assertThrows(ApplicationFailure.class, () -> ChatModelTypes.checkMediaSize(big));
assertTrue(ex.isNonRetryable(), "guard must throw a non-retryable ApplicationFailure");
assertEquals(ChatModelTypes.MEDIA_SIZE_EXCEEDED_FAILURE_TYPE, ex.getType());
String msg = ex.getOriginalMessage();
assertTrue(msg.contains("URI"), "message should point at the URI alternative: " + msg);
assertTrue(
msg.contains("io.temporal.springai.maxMediaBytes"),
"message should mention the override system property: " + msg);
}

@Test
void checkMediaSize_null_passes() {
assertDoesNotThrow(() -> ChatModelTypes.checkMediaSize(null));
}

@Test
void inboundPath_oversizedUserMessageMedia_failsTheWorkflow() {
// Workflow → activity direction: the workflow builds a Prompt with a huge byte[] media,
// ActivityChatModel.createActivityInput calls toMediaContent → checkMediaSize throws.
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(BigInboundMediaWorkflowImpl.class);
worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel()));
testEnv.start();

ChatWorkflow workflow =
client.newWorkflowStub(
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
WorkflowException ex = assertThrows(WorkflowException.class, () -> workflow.chat("hi"));
String message = rootMessage(ex);
assertTrue(
message.contains(ChatModelTypes.MEDIA_SIZE_EXCEEDED_FAILURE_TYPE)
|| message.contains("-byte limit"),
"expected size-guard failure, got: " + message);
}

@Test
void inboundPath_smallMedia_passes() {
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(SmallInboundMediaWorkflowImpl.class);
worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel()));
testEnv.start();

ChatWorkflow workflow =
client.newWorkflowStub(
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
assertEquals("pong", workflow.chat("hi"));
}

@Test
void inboundPath_uriMedia_passes_regardlessOfSize() {
// URI-based media is not subject to the byte[] guard — bytes stay out of workflow history.
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(UriMediaWorkflowImpl.class);
worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel()));
testEnv.start();

ChatWorkflow workflow =
client.newWorkflowStub(
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
assertEquals("pong", workflow.chat("hi"));
}

@Test
void outboundPath_assistantEchoesOversizedMedia_failsTheActivity() {
// Activity → workflow direction: the stub ChatModel returns an assistant message with a
// huge byte[] media, ChatModelActivityImpl.fromMedia → checkMediaSize throws.
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(EchoMediaWorkflowImpl.class);
worker.registerActivitiesImplementations(
new ChatModelActivityImpl(new BigOutboundMediaChatModel()));
testEnv.start();

ChatWorkflow workflow =
client.newWorkflowStub(
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
WorkflowException ex = assertThrows(WorkflowException.class, () -> workflow.chat("hi"));
String message = rootMessage(ex);
assertTrue(
message.contains("exceeds the") && message.contains("-byte limit"),
"expected size-guard failure on return path, got: " + message);
}

private static String rootMessage(Throwable t) {
Throwable cur = t;
while (cur.getCause() != null) {
cur = cur.getCause();
}
return cur.getMessage() == null ? "" : cur.getMessage();
}

@WorkflowInterface
public interface ChatWorkflow {
@WorkflowMethod
String chat(String message);
}

public static class BigInboundMediaWorkflowImpl implements ChatWorkflow {
@Override
public String chat(String message) {
byte[] big = new byte[(int) ChatModelTypes.MAX_MEDIA_BYTES_IN_HISTORY + 1];
UserMessage userMessage =
UserMessage.builder()
.text(message)
.media(List.of(new Media(MimeTypeUtils.IMAGE_PNG, new ByteArrayResource(big))))
.build();
ActivityChatModel chatModel = ActivityChatModel.forDefault();
return chatModel.call(new Prompt(List.of(userMessage))).getResult().getOutput().getText();
}
}

public static class SmallInboundMediaWorkflowImpl implements ChatWorkflow {
@Override
public String chat(String message) {
byte[] small = new byte[16 * 1024]; // 16 KiB
UserMessage userMessage =
UserMessage.builder()
.text(message)
.media(List.of(new Media(MimeTypeUtils.IMAGE_PNG, new ByteArrayResource(small))))
.build();
ActivityChatModel chatModel = ActivityChatModel.forDefault();
return chatModel.call(new Prompt(List.of(userMessage))).getResult().getOutput().getText();
}
}

public static class UriMediaWorkflowImpl implements ChatWorkflow {
@Override
public String chat(String message) {
UserMessage userMessage =
UserMessage.builder()
.text(message)
.media(
List.of(
new Media(
MimeTypeUtils.IMAGE_PNG, URI.create("https://cdn.example.com/huge.png"))))
.build();
ActivityChatModel chatModel = ActivityChatModel.forDefault();
return chatModel.call(new Prompt(List.of(userMessage))).getResult().getOutput().getText();
}
}

public static class EchoMediaWorkflowImpl implements ChatWorkflow {
@Override
public String chat(String message) {
ActivityChatModel chatModel = ActivityChatModel.forDefault();
return chatModel.call(new Prompt(message)).getResult().getOutput().getText();
}
}

/** Returns "pong" — used to verify non-failing paths. */
private static class StubChatModel implements ChatModel {
@Override
public ChatResponse call(Prompt prompt) {
return ChatResponse.builder()
.generations(List.of(new Generation(new AssistantMessage("pong"))))
.build();
}

@Override
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
throw new UnsupportedOperationException();
}
}

/** Returns an assistant message carrying a huge byte[] media, to trip the outbound guard. */
private static class BigOutboundMediaChatModel implements ChatModel {
@Override
public ChatResponse call(Prompt prompt) {
byte[] big = new byte[(int) ChatModelTypes.MAX_MEDIA_BYTES_IN_HISTORY + 1];
AssistantMessage assistant =
AssistantMessage.builder()
.content("")
.media(List.of(new Media(MimeType.valueOf("image/png"), new ByteArrayResource(big))))
.build();
return ChatResponse.builder().generations(List.of(new Generation(assistant))).build();
}

@Override
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
throw new UnsupportedOperationException();
}
}
}
Loading