Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +55,8 @@ public class SkillBox implements StateModule {
private SkillFileFilter fileFilter;
private boolean autoUploadSkill = true;

private static final ConcurrentHashMap<String, Object> FILE_LOCKS = new ConcurrentHashMap<>();

public SkillBox(Toolkit toolkit) {
this(toolkit, null, null);
}
Expand Down Expand Up @@ -759,13 +762,19 @@ public void uploadSkillFiles() {
if (targetPath.getParent() != null) {
Files.createDirectories(targetPath.getParent());
}
if (content.startsWith(BASE64_PREFIX)) {
String encoded = content.substring(BASE64_PREFIX.length());
byte[] decoded = Base64.getDecoder().decode(encoded);
Files.write(targetPath, decoded);
} else {
Files.writeString(targetPath, content, StandardCharsets.UTF_8);

Object lock =
FILE_LOCKS.computeIfAbsent(targetPath.toString(), k -> new Object());
synchronized (lock) {
if (content.startsWith(BASE64_PREFIX)) {
String encoded = content.substring(BASE64_PREFIX.length());
byte[] decoded = Base64.getDecoder().decode(encoded);
Files.write(targetPath, decoded);
} else {
Files.writeString(targetPath, content, StandardCharsets.UTF_8);
}
}

logger.debug("Uploaded file: {}", targetPath);
fileCount++;
} catch (IOException | IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -747,6 +753,98 @@ void testPreserveCustomValidator() {
// Verify validator was preserved
assertEquals(customValidator, shellTool.getCommandValidator());
}

@Test
@DisplayName(
"Should safely upload skill files concurrently across multiple SkillBox instances")
void testConcurrentUploadSkillFiles() throws InterruptedException {
int threadCount = 10;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);

try {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(threadCount);

// Shared working directory for all SkillBox instances
String sharedWorkDir = tempDir.resolve("concurrent-upload").toString();

// Create a relatively large payload to increase the probability of write collisions
String largeContent = "A".repeat(100 * 1024);
Map<String, String> resources = new HashMap<>();
resources.put("scripts/heavy_worker.py", largeContent);
AgentSkill sharedSkill =
new AgentSkill(
"concurrent_skill", "Concurrent Skill", "Content", resources);

List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < threadCount; i++) {
executor.submit(
() -> {
try {
// Simulate multiple isolated agents, each with its own Toolkit
// and
// SkillBox
Toolkit localToolkit = new Toolkit();
SkillBox localSkillBox = new SkillBox(localToolkit);

// Point all of them to the exact same shared physical directory
localSkillBox
.codeExecution()
.workDir(sharedWorkDir)
.withShell()
.withRead()
.withWrite()
.enable();

localSkillBox.registerSkill(sharedSkill);

startLatch.await();

// Concurrent execution! (This would corrupt files or throw
// FileSystemException without the lock)
localSkillBox.uploadSkillFiles();
} catch (Exception e) {
exceptions.add(e);
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();

// Wait up to 10 seconds for all threads to finish
assertTrue(
doneLatch.await(10, TimeUnit.SECONDS),
"Timeout waiting for concurrent uploads");
executor.shutdown();

assertTrue(
exceptions.isEmpty(),
"Concurrent execution threw exceptions: " + exceptions);

Path targetPath =
Path.of(sharedWorkDir)
.resolve("skills/concurrent_skill_custom/scripts/heavy_worker.py");
assertTrue(Files.exists(targetPath), "Target file should exist");

assertDoesNotThrow(
() -> {
String readContent = Files.readString(targetPath);
assertEquals(
largeContent.length(),
readContent.length(),
"File content should not be corrupted or truncated");
assertEquals(
largeContent,
readContent,
"File content exactly matches the original");
});
} finally {
executor.shutdownNow();
}
}
}

@Test
Expand Down
Loading