Skip to content

Commit 7c19b5a

Browse files
authored
Merge pull request #1180 from quickfix-j/copilot/analyze-differences-1171-1173
Add doc on threading model
2 parents 23610bb + a4afb8d commit 7c19b5a

3 files changed

Lines changed: 345 additions & 0 deletions

File tree

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ Here are explanations of what these functions provide for you.
111111

112112
The sample code below shows how you might start up a FIX acceptor which listens on a socket. If you wanted an initiator, you would simply replace the acceptor in this code fragment with a `SocketInitiator`. `ThreadedSocketInitiator` and `ThreadedSocketAcceptor` classes are also available. These will supply a thread to each session that is created. If you use these you must make sure your application is thread safe.
113113

114+
For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md) and [docs/threading-developer-guide.md‎](docs/threading-developer-guide.md).
115+
116+
114117
```Java
115118
import quickfix.*;
116119
import java.io.FileInputStream;

docs/threading-developer-guide.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# QuickFIX/J Threading Model — Developer Guide
2+
3+
QuickFIX/J provides two threading strategies for processing FIX messages. The choice of strategy
4+
affects how your application handles concurrent sessions and what thread-safety guarantees you must
5+
provide.
6+
7+
## Threading Strategies
8+
9+
### Single-Threaded Strategy
10+
11+
**Classes:** `SocketAcceptor` / `SocketInitiator`
12+
13+
All sessions share a single message-processing thread (named `QFJ Message Processor`). Incoming
14+
messages from all sessions are placed in a shared queue and dispatched one at a time by this thread.
15+
16+
This means your `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are always invoked from the
17+
same thread, so you do not need to make your application code thread-safe with respect to concurrent
18+
session callbacks. However, a slow callback will delay message processing for all other sessions.
19+
20+
**Use when:**
21+
- You have a small number of sessions.
22+
- Simplicity and predictable, sequential message processing are more important than throughput.
23+
- You want to avoid the complexity of thread-safe application code.
24+
25+
### Thread-Per-Session Strategy
26+
27+
**Classes:** `ThreadedSocketAcceptor` / `ThreadedSocketInitiator`
28+
29+
Each session gets its own dedicated message-dispatching thread. Incoming messages for a session are
30+
queued and processed by that session's thread independently of other sessions.
31+
32+
Because your `Application` callbacks can be invoked concurrently from multiple session threads, your
33+
application code **must be thread-safe**.
34+
35+
**Use when:**
36+
- You have multiple sessions and need them to process messages independently.
37+
- A slow or blocking callback for one session must not impact other sessions.
38+
- You can ensure your application implementation is thread-safe.
39+
40+
## Queue Capacity and Back-pressure
41+
42+
Both strategies support configuring the internal message queue capacity to control back-pressure:
43+
44+
```java
45+
// Fixed-capacity queue (blocks producers when full)
46+
Acceptor acceptor = new ThreadedSocketAcceptor(
47+
application, storeFactory, settings, logFactory, messageFactory,
48+
queueCapacity);
49+
50+
// Watermark-based flow control
51+
Acceptor acceptor = ThreadedSocketAcceptor.newBuilder()
52+
.withApplication(application)
53+
.withMessageStoreFactory(storeFactory)
54+
.withSettings(settings)
55+
.withLogFactory(logFactory)
56+
.withMessageFactory(messageFactory)
57+
.withQueueLowerWatermark(lowerWatermark)
58+
.withQueueUpperWatermark(upperWatermark)
59+
.build();
60+
```
61+
62+
The same constructors and builder options are available on `SocketAcceptor`, `SocketInitiator`, and
63+
`ThreadedSocketInitiator`.
64+
65+
## Choosing a Strategy
66+
67+
| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` |
68+
|---|---|---|
69+
| Message processing | Single shared thread | One thread per session |
70+
| Application thread-safety required | No | Yes |
71+
| Session isolation | No | Yes |
72+
| Typical use case | Few sessions, simple apps | Many sessions, independent processing |
73+
74+
## Example: Starting an Acceptor
75+
76+
```java
77+
import quickfix.*;
78+
import java.io.FileInputStream;
79+
80+
public class MyApp {
81+
82+
public static void main(String[] args) throws Exception {
83+
Application application = new MyApplication();
84+
SessionSettings settings = new SessionSettings(new FileInputStream(args[0]));
85+
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
86+
LogFactory logFactory = new FileLogFactory(settings);
87+
MessageFactory messageFactory = new DefaultMessageFactory();
88+
89+
// Single-threaded: all sessions share one message-processing thread
90+
Acceptor acceptor = new SocketAcceptor(
91+
application, storeFactory, settings, logFactory, messageFactory);
92+
93+
// OR thread-per-session: each session has its own message-processing thread
94+
// (application must be thread-safe)
95+
// Acceptor acceptor = new ThreadedSocketAcceptor(
96+
// application, storeFactory, settings, logFactory, messageFactory);
97+
98+
acceptor.start();
99+
// ... run your application ...
100+
acceptor.stop();
101+
}
102+
}
103+
```
104+
105+
## Thread Safety Guidance
106+
107+
Regardless of which strategy you choose, note that `Session.sendToTarget()` is thread-safe and may
108+
be called from any thread to send outgoing messages.
109+
110+
When using `ThreadedSocketAcceptor` or `ThreadedSocketInitiator`, ensure that any shared state
111+
accessed in your `Application` implementation (e.g., order books, maps, counters) is properly
112+
synchronized or uses thread-safe data structures.
113+
114+
---
115+
116+
*For a deep technical reference on the threading internals, see [`threading-model.md`](./threading-model.md).*

docs/threading-model.md

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# QuickFIX/J Threading Model
2+
3+
## 1. Overview
4+
5+
QuickFIX/J uses [Apache MINA](http://mina.apache.org/) for non-blocking I/O. The threading model for message processing is controlled by the `EventHandlingStrategy` interface (`quickfix.mina.EventHandlingStrategy`), with two concrete implementations:
6+
7+
- **`SingleThreadedEventHandlingStrategy`** — one thread processes messages for all sessions (`SocketAcceptor`, `SocketInitiator`)
8+
- **`ThreadPerSessionEventHandlingStrategy`** — one thread per session processes messages (`ThreadedSocketAcceptor`, `ThreadedSocketInitiator`)
9+
10+
Both strategies co-exist with the **timer thread**, which is always present and always calls `Session.next()` (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use.
11+
12+
---
13+
14+
## 2. Connector Classes and Their Strategy
15+
16+
| Connector class | Event handling strategy | Thread name(s) |
17+
|---|---|---|
18+
| `SocketAcceptor` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
19+
| `SocketInitiator` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
20+
| `ThreadedSocketAcceptor` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |
21+
| `ThreadedSocketInitiator` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |
22+
23+
---
24+
25+
## 3. Single-Threaded Model (`SingleThreadedEventHandlingStrategy`)
26+
27+
**Class:** `quickfix.mina.SingleThreadedEventHandlingStrategy`
28+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java`
29+
30+
- A single `BlockingQueue<SessionMessageEvent>` holds events from **all** sessions.
31+
- One background thread named **`QFJ Message Processor`** (a daemon thread) drains the queue and calls `session.next(message)` for each event via `SessionMessageEvent.processMessage()`.
32+
- The thread is started via `blockInThread()`, which creates a `ThreadAdapter` wrapping the `block()` loop.
33+
- `onMessage()` wraps incoming messages into a `SessionMessageEvent` and puts them on the shared queue.
34+
- The `block()` loop polls the queue with a timeout (`THREAD_WAIT_FOR_MESSAGE_MS`) so it can periodically check the `isStopped` flag.
35+
- On stop, remaining queued messages are drained and processed before the thread exits.
36+
- The `getQueueSize(SessionID)` method returns the total queue size (single queue for all sessions — there is no per-session view).
37+
38+
**Key point for application developers:** Because all sessions share a single processing thread, a slow `fromApp()` callback will delay processing for **all** other sessions.
39+
40+
---
41+
42+
## 4. Thread-per-Session Model (`ThreadPerSessionEventHandlingStrategy`)
43+
44+
**Class:** `quickfix.mina.ThreadPerSessionEventHandlingStrategy`
45+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java`
46+
47+
- A `ConcurrentHashMap<SessionID, MessageDispatchingThread>` maps each session to its own dispatcher thread.
48+
- On the first `onMessage()` call for a given session, a new `MessageDispatchingThread` is created and started via `startDispatcherThread()`.
49+
- Each `MessageDispatchingThread` has its own `BlockingQueue<Message>` (or watermark-tracked queue) and loops calling `session.next(message)`.
50+
- Thread name: **`QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>`**
51+
- The `Executor` can be customised via `setExecutor()`. The default is `DedicatedThreadExecutor`, which creates a plain `new Thread(command, name).start()`.
52+
- On stop, `stopDispatcherThreads()` enqueues `END_OF_STREAM` to every dispatcher, sets `stopping=true`, and waits (polling every 100 ms) until all dispatchers report `isStopped`.
53+
- After a dispatcher drains its remaining queue on shutdown, it removes itself from the `dispatchers` map.
54+
55+
**Key point for application developers:** Since each session has its own thread, a slow `fromApp()` for one session does **not** block others. However, your `Application` implementation **must be thread-safe** if it shares state across sessions.
56+
57+
---
58+
59+
## 5. The Timer Thread and `Session.next()`
60+
61+
This is a critical part of the threading model that is **orthogonal** to the message-processing strategies above.
62+
63+
**Class:** `quickfix.mina.SessionConnector`
64+
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java`
65+
66+
### 5.1 The `QFJ Timer` Thread
67+
68+
A single `ScheduledExecutorService` (a shared static instance using a `QFTimerThreadFactory`) runs a `SessionTimerTask` at a fixed rate of **every 1000 ms**.
69+
70+
```java
71+
// SessionConnector.java
72+
private static class QFTimerThreadFactory implements ThreadFactory {
73+
@Override
74+
public Thread newThread(Runnable runnable) {
75+
Thread thread = new Thread(runnable, "QFJ Timer");
76+
thread.setDaemon(true);
77+
return thread;
78+
}
79+
}
80+
```
81+
82+
The timer is started by `startSessionTimer()`:
83+
84+
```java
85+
protected void startSessionTimer() {
86+
if (checkSessionTimerRunning()) {
87+
return;
88+
}
89+
Runnable timerTask = new SessionTimerTask();
90+
if (shortLivedExecutor != null) {
91+
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
92+
}
93+
sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
94+
TimeUnit.MILLISECONDS);
95+
}
96+
```
97+
98+
Only one timer is ever started per connector. If `startSessionTimer()` is called again while the timer is still running (e.g. during `createDynamicSession()`), the existing timer is reused.
99+
100+
### 5.2 `SessionTimerTask` Iterates All Sessions and Calls `Session.next()`
101+
102+
```java
103+
private class SessionTimerTask implements Runnable {
104+
@Override
105+
public void run() {
106+
try {
107+
for (Session session : sessions.values()) {
108+
try {
109+
session.next();
110+
} catch (IOException e) {
111+
LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
112+
}
113+
}
114+
} catch (Throwable e) {
115+
log.error("Error during timer processing", e);
116+
}
117+
}
118+
}
119+
```
120+
121+
Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls `session.next()` directly on every session. This is independent of which `EventHandlingStrategy` is in use.
122+
123+
### 5.3 What Does `Session.next()` (No-arg) Do?
124+
125+
`Session.next()` is called from the timer, **not** from user code. Its Javadoc states:
126+
127+
> Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code.
128+
129+
Its responsibilities (from `Session.java`):
130+
131+
1. **Checks if the session is enabled.** If disabled and still logged on, it initiates a Logout.
132+
2. **Checks session schedule.** If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second.
133+
3. **Returns early if not connected** (`hasResponder()` is false).
134+
4. **Handles logon state:** If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout.
135+
5. **Checks logout timeout** if a logout has been sent.
136+
6. **Heartbeat management:**
137+
- If `HeartBtInt == 0`: returns (no heartbeat management).
138+
- If timed out waiting for a heartbeat: disconnects (unless `DisableHeartBeatCheck=Y`).
139+
- If a TestRequest is needed: sends a TestRequest (`generateTestRequest("TEST")`).
140+
- If a Heartbeat is needed: sends a Heartbeat (`generateHeartbeat()`).
141+
142+
The full flow:
143+
144+
```
145+
QFJ Timer thread (every 1 second)
146+
└─► SessionTimerTask.run()
147+
└─► for each Session in sessions.values():
148+
└─► Session.next()
149+
├─ check enabled
150+
├─ check session schedule / reset
151+
├─ check hasResponder()
152+
├─ check logon state (send Logon if initiator)
153+
├─ check logout timeout
154+
└─ heartbeat management
155+
├─ isTimedOut() → disconnect
156+
├─ isTestRequestNeeded() → send TestRequest
157+
└─ isHeartBeatNeeded() → send Heartbeat
158+
```
159+
160+
### 5.4 The Overloaded `Session.next(Message)` — Called by Dispatchers
161+
162+
The `Session.next(Message message)` overload is what `MessageDispatchingThread` and `SessionMessageEvent` call with an actual FIX message. This processes the received message (validates, dispatches to `fromAdmin` / `fromApp`, handles sequence numbers, etc.). This is **distinct** from the no-arg `Session.next()` used by the timer.
163+
164+
---
165+
166+
## 6. Thread Interaction Summary
167+
168+
```
169+
┌─────────────────────────────────────────────────────────────────────────┐
170+
│ MINA I/O Threads │
171+
│ (NIO selector threads, named "NioProcessor-N") │
172+
│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │
173+
└──────────────────────────────┬──────────────────────────────────────────┘
174+
│ onMessage(session, message)
175+
┌───────────────────┴────────────────────┐
176+
│ │
177+
SingleThreaded ThreadPerSession
178+
────────────── ────────────────
179+
One shared queue Per-session queue
180+
One "QFJ Message Processor" One "QF/J Session dispatcher:
181+
thread calls <sessionID>" thread per session
182+
session.next(msg) calls session.next(msg)
183+
184+
Both strategies co-exist with the Timer Thread:
185+
186+
┌─────────────────────────────────────────────────────────────────────────┐
187+
│ QFJ Timer Thread (daemon) │
188+
│ ScheduledExecutorService fires every 1000ms │
189+
│ SessionTimerTask iterates ALL sessions → calls Session.next() │
190+
│ (handles heartbeats, logon, session schedule, timeouts) │
191+
└─────────────────────────────────────────────────────────────────────────┘
192+
```
193+
194+
---
195+
196+
## 7. Queue Capacity and Back-Pressure
197+
198+
Both strategies support configurable queue capacity:
199+
200+
- **Fixed capacity:** `new SingleThreadedEventHandlingStrategy(connector, queueCapacity)` — bounded `LinkedBlockingQueue`. Producers block when full (back-pressure).
201+
- **Watermark-based:** `new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark)` — uses `QueueTrackers.newMultiSessionWatermarkTracker(...)`. Flow control is applied per-session within the shared queue.
202+
- Same two options exist for `ThreadPerSessionEventHandlingStrategy`, with `newSingleSessionWatermarkTracker` per session.
203+
204+
---
205+
206+
## 8. Custom `Executor` Injection
207+
208+
Both strategies accept a custom `java.util.concurrent.Executor` via `setExecutor(executor)`, called during `start()` from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+):
209+
210+
```java
211+
// Example: use virtual threads for session dispatchers (Java 21+)
212+
ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...);
213+
acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor)
214+
```
215+
216+
The `longLivedExecutor` is provided by `SessionConnector` and can be customised. If no executor is set, `DedicatedThreadExecutor` creates a plain `new Thread(...)` per session/strategy.
217+
218+
---
219+
220+
## 9. Thread Safety Implications for Application Developers
221+
222+
- **`SocketAcceptor` / `SocketInitiator` (single-threaded):** The `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are called from the single `QFJ Message Processor` thread. No concurrent calls to the same session. However, `Session.next()` (timer) runs concurrently from the `QFJ Timer` thread — it does not call application callbacks but it does send messages on the wire.
223+
- **`ThreadedSocketAcceptor` / `ThreadedSocketInitiator` (thread-per-session):** Each session has its own dispatcher thread. Callbacks for **different sessions** may execute concurrently. Your `Application` implementation **must be thread-safe** if it shares state across sessions.
224+
- In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state.
225+
226+
---

0 commit comments

Comments
 (0)