Skip to content

Commit 3168b6a

Browse files
authored
Merge pull request #103 from matanzwix/fix/thread-safety-eval-context
fix: preserve the context state during evaluate — fixes #90, #93
2 parents 0b40af3 + 2c75fd2 commit 3168b6a

2 files changed

Lines changed: 307 additions & 1 deletion

File tree

src/main/java/com/dashjoin/jsonata/Jsonata.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,17 @@ public void setEvaluateExitCallback(ExitCallback cb) {
129129
Object evaluate(Symbol expr, Object input, Frame environment) {
130130
// Thread safety:
131131
// Make sure each evaluate is executed on an instance per thread
132-
return getPerThreadInstance()._evaluate(expr, input, environment);
132+
Jsonata _this = getPerThreadInstance();
133+
// Save and restore the evaluation context so that nested
134+
// evaluations (e.g. $eval()) see the correct context.
135+
Object _input = _this.input;
136+
Frame _environment = _this.environment;
137+
try {
138+
return _this._evaluate(expr, input, environment);
139+
} finally {
140+
_this.input = _input;
141+
_this.environment = _environment;
142+
}
133143
}
134144

135145
Object _evaluate(Symbol expr, Object input, Frame environment) {
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
package com.dashjoin.jsonata;
2+
3+
import static com.dashjoin.jsonata.Jsonata.jsonata;
4+
import static org.junit.jupiter.api.Assertions.*;
5+
6+
import java.util.*;
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import org.junit.jupiter.api.Test;
11+
12+
/**
13+
* Thread safety and instance isolation tests for the Jsonata ThreadLocal fix.
14+
* Covers GitHub issues #90 and #93, plus high-throughput concurrent scenarios.
15+
*/
16+
public class ThreadSafetyTest {
17+
18+
@Test
19+
public void testTwoInstancesSameThread_bindingsDontLeak() {
20+
Jsonata exprA = jsonata("$test");
21+
Jsonata.Frame env = exprA.createFrame();
22+
env.bind("test", "value_from_A");
23+
24+
// Constructing exprB must NOT corrupt exprA's evaluation context
25+
Jsonata exprB = jsonata("$test");
26+
27+
Object resultA = exprA.evaluate("", env);
28+
Object resultB = exprB.evaluate("");
29+
30+
assertEquals("value_from_A", resultA, "exprA should see its own binding");
31+
assertNull(resultB, "exprB should NOT see exprA's binding");
32+
}
33+
34+
@Test
35+
public void testTwoInstancesSameThread_differentExpressions() {
36+
Jsonata exprA = jsonata("a");
37+
Jsonata exprB = jsonata("b");
38+
39+
assertEquals(1, exprA.evaluate(Map.of("a", 1, "b", 99)));
40+
assertEquals(2, exprB.evaluate(Map.of("a", 99, "b", 2)));
41+
// Re-evaluate exprA to confirm it still works correctly
42+
assertEquals(3, exprA.evaluate(Map.of("a", 3, "b", 99)));
43+
}
44+
45+
@Test
46+
public void testManyInstancesSameThread_interleaved() {
47+
Jsonata add = jsonata("a + b");
48+
Jsonata mul = jsonata("a * b");
49+
Jsonata evalExpr = jsonata("$eval('a')");
50+
51+
for (int i = 1; i <= 500; i++) {
52+
assertEquals(i + 1, add.evaluate(Map.of("a", i, "b", 1)),
53+
"add failed at iteration " + i);
54+
assertEquals(i * 2, mul.evaluate(Map.of("a", i, "b", 2)),
55+
"mul failed at iteration " + i);
56+
assertEquals(i, evalExpr.evaluate(Map.of("a", i)),
57+
"eval failed at iteration " + i);
58+
}
59+
}
60+
61+
@Test
62+
public void testEvalDeepContext() {
63+
// This is the exact reproduction from issue #90
64+
Jsonata expr = jsonata("$eval($.funcs.func)");
65+
Object input = Map.of(
66+
"funcs", Map.of("func", "$.a + $.b"),
67+
"a", 3,
68+
"b", 4
69+
);
70+
assertEquals(7, expr.evaluate(input));
71+
}
72+
73+
@Test
74+
public void testEvalWithSimplePath() {
75+
Jsonata expr = jsonata("$eval('a')");
76+
assertEquals(42, expr.evaluate(Map.of("a", 42)));
77+
}
78+
79+
@Test
80+
public void testEvalWithNestedPath() {
81+
Jsonata expr = jsonata("$eval('a.b.c')");
82+
assertEquals(99, expr.evaluate(
83+
Map.of("a", Map.of("b", Map.of("c", 99)))));
84+
}
85+
86+
@Test
87+
public void testNestedEval() {
88+
Jsonata expr = jsonata("$eval(\"$eval('a')\")");
89+
assertEquals(7, expr.evaluate(Map.of("a", 7)));
90+
}
91+
92+
@Test
93+
public void testEvalWithinPathStepUsesCurrentItemContext() {
94+
Jsonata expr = jsonata("items.$eval('a')");
95+
Object input = Map.of(
96+
"items", List.of(
97+
Map.of("a", 1),
98+
Map.of("a", 2)
99+
)
100+
);
101+
assertEquals(List.of(1, 2), expr.evaluate(input));
102+
}
103+
104+
@Test
105+
public void testEvalWithinFilterUsesCurrentItemContext() {
106+
Jsonata expr = jsonata("items[$eval('a') = 2].a");
107+
Object input = Map.of(
108+
"items", List.of(
109+
Map.of("a", 1),
110+
Map.of("a", 2)
111+
)
112+
);
113+
assertEquals(2, expr.evaluate(input));
114+
}
115+
116+
@Test
117+
public void testCachedInstanceConcurrentThreads() throws Exception {
118+
int threads = 10;
119+
int itersPerThread = 1000;
120+
Jsonata expr = jsonata("a + b");
121+
122+
CountDownLatch startGate = new CountDownLatch(1);
123+
ExecutorService pool = Executors.newFixedThreadPool(threads);
124+
AtomicInteger errorCount = new AtomicInteger(0);
125+
List<Future<?>> futures = new ArrayList<>();
126+
127+
for (int t = 0; t < threads; t++) {
128+
final int threadId = t;
129+
futures.add(pool.submit(() -> {
130+
try {
131+
startGate.await();
132+
} catch (InterruptedException e) { return; }
133+
134+
for (int i = 0; i < itersPerThread; i++) {
135+
Object result = expr.evaluate(Map.of("a", threadId, "b", 1));
136+
if (!Integer.valueOf(threadId + 1).equals(result)) {
137+
errorCount.incrementAndGet();
138+
}
139+
}
140+
}));
141+
}
142+
143+
startGate.countDown(); // release all threads simultaneously
144+
for (Future<?> f : futures) f.get(30, TimeUnit.SECONDS);
145+
pool.shutdown();
146+
147+
assertEquals(0, errorCount.get(),
148+
"Concurrent evaluation of cached instance produced wrong results");
149+
}
150+
151+
@Test
152+
public void testHighThroughputWithEval() throws Exception {
153+
int threads = 16;
154+
int itersPerThread = 2000;
155+
Jsonata expr = jsonata("$eval('a') + b");
156+
157+
CountDownLatch startGate = new CountDownLatch(1);
158+
ExecutorService pool = Executors.newFixedThreadPool(threads);
159+
AtomicInteger errorCount = new AtomicInteger(0);
160+
List<String> sampleErrors = Collections.synchronizedList(new ArrayList<>());
161+
List<Future<?>> futures = new ArrayList<>();
162+
163+
for (int t = 0; t < threads; t++) {
164+
final int threadId = t;
165+
futures.add(pool.submit(() -> {
166+
try {
167+
startGate.await();
168+
} catch (InterruptedException e) { return; }
169+
170+
int expected = threadId * 10 + threadId; // threadId * 11
171+
for (int i = 0; i < itersPerThread; i++) {
172+
try {
173+
Object result = expr.evaluate(Map.of("a", threadId * 10, "b", threadId));
174+
if (!Integer.valueOf(expected).equals(result)) {
175+
errorCount.incrementAndGet();
176+
if (sampleErrors.size() < 5) {
177+
sampleErrors.add("thread-" + threadId + " iter-" + i +
178+
": expected " + expected + " got " + result);
179+
}
180+
}
181+
} catch (Exception e) {
182+
errorCount.incrementAndGet();
183+
if (sampleErrors.size() < 5) {
184+
sampleErrors.add("thread-" + threadId + " iter-" + i +
185+
": " + e.getClass().getSimpleName() + ": " + e.getMessage());
186+
}
187+
}
188+
}
189+
}));
190+
}
191+
192+
startGate.countDown();
193+
for (Future<?> f : futures) f.get(60, TimeUnit.SECONDS);
194+
pool.shutdown();
195+
196+
assertEquals(0, errorCount.get(),
197+
"High-throughput eval errors: " + sampleErrors);
198+
}
199+
200+
@Test
201+
public void testCustomFunctionMultiThread() throws Exception {
202+
int threads = 10;
203+
int itersPerThread = 500;
204+
Jsonata expr = jsonata("$double(a)");
205+
expr.registerFunction("double", (Integer x) -> x * 2);
206+
207+
CountDownLatch startGate = new CountDownLatch(1);
208+
ExecutorService pool = Executors.newFixedThreadPool(threads);
209+
AtomicInteger errorCount = new AtomicInteger(0);
210+
List<Future<?>> futures = new ArrayList<>();
211+
212+
for (int t = 0; t < threads; t++) {
213+
final int threadId = t + 1; // 1-based to avoid $double(0)
214+
futures.add(pool.submit(() -> {
215+
try {
216+
startGate.await();
217+
} catch (InterruptedException e) { return; }
218+
219+
int expected = threadId * 2;
220+
for (int i = 0; i < itersPerThread; i++) {
221+
Object result = expr.evaluate(Map.of("a", threadId));
222+
if (!Integer.valueOf(expected).equals(result)) {
223+
errorCount.incrementAndGet();
224+
}
225+
}
226+
}));
227+
}
228+
229+
startGate.countDown();
230+
for (Future<?> f : futures) f.get(30, TimeUnit.SECONDS);
231+
pool.shutdown();
232+
233+
assertEquals(0, errorCount.get(),
234+
"Custom function multi-thread evaluation produced wrong results");
235+
}
236+
237+
@Test
238+
public void testCachedInstanceWithBindingsMultiThread() throws Exception {
239+
int threads = 8;
240+
int itersPerThread = 1000;
241+
Jsonata expr = jsonata("$myVar + a");
242+
243+
CountDownLatch startGate = new CountDownLatch(1);
244+
ExecutorService pool = Executors.newFixedThreadPool(threads);
245+
AtomicInteger errorCount = new AtomicInteger(0);
246+
List<Future<?>> futures = new ArrayList<>();
247+
248+
for (int t = 0; t < threads; t++) {
249+
final int threadId = t;
250+
Jsonata.Frame frame = expr.createFrame();
251+
frame.bind("myVar", threadId * 100);
252+
253+
futures.add(pool.submit(() -> {
254+
try {
255+
startGate.await();
256+
} catch (InterruptedException e) { return; }
257+
258+
int expected = threadId * 100 + threadId;
259+
for (int i = 0; i < itersPerThread; i++) {
260+
Object result = expr.evaluate(Map.of("a", threadId), frame);
261+
if (!Integer.valueOf(expected).equals(result)) {
262+
errorCount.incrementAndGet();
263+
}
264+
}
265+
}));
266+
}
267+
268+
startGate.countDown();
269+
for (Future<?> f : futures) f.get(30, TimeUnit.SECONDS);
270+
pool.shutdown();
271+
272+
assertEquals(0, errorCount.get(),
273+
"Cached instance with per-thread bindings produced wrong results");
274+
}
275+
276+
@Test
277+
public void testNowWithCachedInstance() throws Exception {
278+
Jsonata expr = jsonata("$now()");
279+
Object r1 = expr.evaluate(null);
280+
Thread.sleep(1100); // $now() has second-level precision
281+
Object r2 = expr.evaluate(null);
282+
assertNotNull(r1);
283+
assertNotNull(r2);
284+
assertNotEquals(r1, r2, "$now() should return different values on different calls");
285+
}
286+
287+
@Test
288+
public void testMillisWithCachedInstance() throws Exception {
289+
Jsonata expr = jsonata("$millis()");
290+
long r1 = ((Number) expr.evaluate(null)).longValue();
291+
Thread.sleep(10);
292+
long r2 = ((Number) expr.evaluate(null)).longValue();
293+
assertTrue(r2 > r1, "$millis() should advance across separate evaluations");
294+
}
295+
296+
}

0 commit comments

Comments
 (0)