Skip to content

Commit 94f2de2

Browse files
committed
COLDBOX-1332 #resolve
Virtual Thread executors support
1 parent f8aae83 commit 94f2de2

6 files changed

Lines changed: 190 additions & 34 deletions

File tree

server-adobe@2023.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
"JVM":{
2222
"heapSize":"1024",
23-
"javaVersion":"openjdk11_jre"
23+
"javaVersion":"openjdk21_jre"
2424
},
2525
"cfconfig":{
2626
"file":".cfconfig.json"

system/async/AsyncManager.cfc

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,21 @@ component accessors="true" singleton {
6464
* scheduled executor then actually execute scheduled tasks.
6565
*
6666
* Types of Executors:
67+
* - cached : An unbounded pool where the number of threads will grow according to the tasks it needs to service. The threads are killed by a default 60 second timeout if not used and the pool shrinks back
6768
* - fixed : By default it will build one with 20 threads on it. Great for multiple task execution and worker processing
69+
* - fork_join : A pool that uses the ForkJoinPool.commonPool() by default, it is great for parallel tasks and recursive tasks
6870
* - single : A great way to control that submitted tasks will execute in the order of submission: FIFO
69-
* - cached : An unbounded pool where the number of threads will grow according to the tasks it needs to service. The threads are killed by a default 60 second timeout if not used and the pool shrinks back
7071
* - scheduled : A pool to use for scheduled tasks that can run one time or periodically
72+
* - work_stealing : A pool that allows you to run parallel tasks, it will use the ForkJoinPool.commonPool() by default
73+
* - virtual : A pool that uses the VirtualThreadPerTaskExecutor, it is great for IO bound tasks and can run many tasks in parallel without blocking, requires Java 19+.
7174
*
7275
* @see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
7376
* @name The name of the executor used for registration
74-
* @type The type of executor to build fixed, cached, single, scheduled
77+
* @type The type of executor to build fixed, cached, single, scheduled, work_stealing, virtual
7578
* @threads How many threads to assign to the thread scheduler, default is 20
7679
* @debug Add output debugging
7780
* @loadAppContext Load the CFML App contexts or not, disable if not used
81+
* @parallelism The number of parallel tasks to run at the same time, default is 0 which means no parallelism, only applies to work stealing executors
7882
*
7983
* @return The ColdBox Schedule class to work with the schedule: coldbox.system.async.executors.Executor
8084
*/
@@ -83,7 +87,8 @@ component accessors="true" singleton {
8387
type = "fixed",
8488
numeric threads = this.$executors.DEFAULT_THREADS,
8589
boolean debug = false,
86-
boolean loadAppContext = true
90+
boolean loadAppContext = true,
91+
numeric parallelism = 0
8792
){
8893
// Build it if not found
8994
if ( !variables.executors.keyExists( arguments.name ) ) {
@@ -100,7 +105,7 @@ component accessors="true" singleton {
100105
*
101106
* @see https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Executors.html
102107
*
103-
* @type Available types are: fixed, cached, single, scheduled, {WireBoxID}
108+
* @type Available types are: fixed, fork_join, cached, single, scheduled, work_stealing, virtual, {WireBoxID}
104109
* @threads The number of threads to seed the executor with, if it allows it
105110
* @debug Add output debugging
106111
* @loadAppContext Load the CFML App contexts or not, disable if not used
@@ -111,33 +116,46 @@ component accessors="true" singleton {
111116
required type,
112117
numeric threads,
113118
boolean debug = false,
114-
boolean loadAppContext = true
119+
boolean loadAppContext = true,
120+
numeric parallelism = 0
115121
){
116122
// Factory to build the right executor
117123
switch ( arguments.type ) {
124+
case "cached": {
125+
arguments.executor = this.$executors.newCachedThreadPool();
126+
return new executors.Executor( argumentCollection = arguments );
127+
}
118128
case "fixed": {
119129
arguments.executor = this.$executors.newFixedThreadPool( arguments.threads );
120130
return new executors.Executor( argumentCollection = arguments );
121131
}
122-
case "cached": {
123-
arguments.executor = this.$executors.newCachedThreadPool();
132+
case "fork_join": {
133+
arguments.executor = this.$executors.newForkJoinPool( arguments.threads );
124134
return new executors.Executor( argumentCollection = arguments );
125135
}
136+
case "scheduled": {
137+
arguments.executor = this.$executors.newScheduledThreadPool( arguments.threads );
138+
return new executors.ScheduledExecutor( argumentCollection = arguments );
139+
}
126140
case "single": {
127141
arguments.executor = this.$executors.newFixedThreadPool( 1 );
128142
return new executors.Executor( argumentCollection = arguments );
129143
}
130-
case "scheduled": {
131-
arguments.executor = this.$executors.newScheduledThreadPool( arguments.threads );
132-
return new executors.ScheduledExecutor( argumentCollection = arguments );
144+
case "work_stealing": {
145+
arguments.executor = this.$executors.newWorkStealingPoolExecutor( arguments.parallelism );
146+
return new executors.Executor( argumentCollection = arguments );
147+
}
148+
case "virtual" : {
149+
arguments.executor = this.$executors.newVirtualThreadExecutor();
150+
return new executors.Executor( argumentCollection = arguments );
133151
}
134152
default: {
135153
}
136154
}
137155
throw(
138156
type = "InvalidExecutorType",
139157
message = "The executor you requested :#arguments.type# does not exist.",
140-
detail = "Valid executors are: fixed, cached, single, scheduled"
158+
detail = "Valid executors are: fixed, fork_join, cached, single, scheduled, virtual, work_stealing, {WireBoxID}"
141159
);
142160
}
143161

system/async/executors/Executor.cfc

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,29 @@ component accessors="true" singleton {
3232
// Prepare the static time unit class
3333
this.$timeUnit = new coldbox.system.async.time.TimeUnit();
3434

35+
variables.features = {
36+
"ScheduledThreadPoolExecutor": {
37+
"pool": true,
38+
"taskMethods": true,
39+
"isTerminating": true
40+
},
41+
"ThreadPoolExecutor": {
42+
"pool": true,
43+
"taskMethods": true,
44+
"isTerminating": true
45+
},
46+
"ForkJoinPool": {
47+
"pool": false,
48+
"taskMethods": false,
49+
"isTerminating": true
50+
},
51+
"ThreadPerTaskExecutor": {
52+
"pool": false,
53+
"taskMethods": false,
54+
"isTerminating": false
55+
}
56+
};
57+
3558
/**
3659
* Constructor
3760
*
@@ -90,6 +113,21 @@ component accessors="true" singleton {
90113
* Executor Utility Methods *
91114
****************************************************************/
92115

116+
/**
117+
* Checks if the executor has a feature based on the type and feature name.
118+
119+
* @feature The feature to check for, e.g., "hasPool", "hasTaskMethods", "hasIsTerminating"
120+
*
121+
* @return true if the feature exists for the type, false otherwise
122+
*/
123+
private boolean function hasFeature( required string feature ){
124+
var classType = variables.native.getClass().getSimpleName();
125+
if( !variables.features.keyExists( classType ) ){
126+
return false;
127+
}
128+
return variables.features[ classType ][ feature ];
129+
}
130+
93131
/**
94132
* Returns true if all tasks have completed following shut down.
95133
*/
@@ -102,7 +140,10 @@ component accessors="true" singleton {
102140
* not completely terminated.
103141
*/
104142
boolean function isTerminating(){
105-
return variables.native.isTerminating();
143+
if( hasFeature( "isTerminating" ) ){
144+
return variables.native.isTerminating();
145+
}
146+
return false;
106147
}
107148

108149
/**
@@ -210,49 +251,49 @@ component accessors="true" singleton {
210251
* Returns the approximate number of threads that are actively executing tasks.
211252
*/
212253
numeric function getActiveCount(){
213-
return variables.native.getActiveCount();
254+
return hasFeature( "taskMethods") ? variables.native.getActiveCount() : 0;
214255
}
215256

216257
/**
217258
* Returns the approximate total number of tasks that have ever been scheduled for execution.
218259
*/
219260
numeric function getTaskCount(){
220-
return variables.native.getTaskCount();
261+
return hasFeature( "taskMethods") ? variables.native.getTaskCount() : 0;
221262
}
222263

223264
/**
224265
* Returns the approximate total number of tasks that have completed execution.
225266
*/
226267
numeric function getCompletedTaskCount(){
227-
return variables.native.getCompletedTaskCount();
268+
return hasFeature( "taskMethods") ? variables.native.getCompletedTaskCount() : 0;
228269
}
229270

230271
/**
231272
* Returns the core number of threads.
232273
*/
233274
numeric function getCorePoolSize(){
234-
return variables.native.getCorePoolSize();
275+
return hasFeature( "pool") ? variables.native.getCorePoolSize() : 0;
235276
}
236277

237278
/**
238279
* Returns the largest number of threads that have ever simultaneously been in the pool.
239280
*/
240281
numeric function getLargestPoolSize(){
241-
return variables.native.getLargestPoolSize();
282+
return hasFeature( "pool") ? variables.native.getLargestPoolSize() : 0;
242283
}
243284

244285
/**
245286
* Returns the maximum allowed number of threads.
246287
*/
247288
numeric function getMaximumPoolSize(){
248-
return variables.native.getMaximumPoolSize();
289+
return hasFeature( "pool") ? variables.native.getMaximumPoolSize() : 0;
249290
}
250291

251292
/**
252293
* Returns the current number of threads in the pool.
253294
*/
254295
numeric function getPoolSize(){
255-
return variables.native.getPoolSize();
296+
return hasFeature( "pool") ? variables.native.getPoolSize() : 0;
256297
}
257298

258299
/**
@@ -273,7 +314,8 @@ component accessors="true" singleton {
273314
"activeCount" : getActiveCount(),
274315
"isTerminated" : isTerminated(),
275316
"isTerminating" : isTerminating(),
276-
"isShutdown" : isShutdown()
317+
"isShutdown" : isShutdown(),
318+
"type" : variables.native.getClass().getName()
277319
};
278320
}
279321

system/async/executors/ExecutorBuilder.cfc

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
/**
22
* Static class to build executors from Java:
33
*
4+
* - CachedThreadPool
45
* - FixedThreadPool
6+
* - ForkJoinPool
57
* - SingleThreadPool
6-
* - CachedThreadPool
78
* - ScheduledThreadPool
9+
* - WorkStealingPool
10+
* - VirtualThreadPool
811
*
9-
* @see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html
12+
* @see https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Executors.html
1013
*/
1114
component singleton {
1215

@@ -57,4 +60,49 @@ component singleton {
5760
return variables.jExecutors.newScheduledThreadPool( javacast( "int", arguments.corePoolSize ) );
5861
}
5962

63+
/**
64+
* Create a virtual thread executor that can be used to run tasks in a virtual thread context.
65+
* @see https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/VirtualThreadExecutor.html
66+
* @return VirtualThreadExecutor: The newly created virtual thread executor
67+
*/
68+
function newVirtualThreadExecutor(){
69+
return variables.jExecutors.newVirtualThreadPerTaskExecutor();
70+
}
71+
72+
/**
73+
* Create a work stealing pool executor that can be used to run tasks in a work-stealing context.
74+
* @see https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/WorkStealingPoolExecutor.html
75+
* @return WorkStealingPoolExecutor: The newly created work stealing pool executor
76+
*/
77+
function newWorkStealingPoolExecutor( numeric parallelism = 0 ){
78+
if ( arguments.parallelism > 0 ) {
79+
return variables.jExecutors.newWorkStealingPool( javacast( "int", arguments.parallelism ) );
80+
}
81+
// If no parallelism is specified, use the default behavior
82+
// which is to use the number of available processors.
83+
// This is similar to the default behavior of the Java Executors class.
84+
// This will create a work-stealing pool with a parallelism level
85+
// equal to the number of available processors.
86+
// This is useful for tasks that can benefit from parallel execution.
87+
return variables.jExecutors.newWorkStealingPool();
88+
}
89+
90+
/**
91+
* New ForkJoinPool executor.
92+
*
93+
* @see https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ForkJoinPool.html
94+
*
95+
* @param maxThreads The maximum number of threads to use in the pool, defaults to 20.
96+
*
97+
* @return ForkJoinPool: The newly created ForkJoinPool
98+
*/
99+
function newForkJoinPool( numeric maxThreads = this.DEFAULT_THREADS ) {
100+
if( maxThreads > 0 ) {
101+
return createObject( "java", "java.util.concurrent.ForkJoinPool" ).init( javacast( "int", arguments.maxThreads ) );
102+
}
103+
// If no maxThreads is specified, use the default behavior
104+
// which is to use the number of available processors.
105+
return createObject( "java", "java.util.concurrent.ForkJoinPool" ).commonPool();
106+
}
107+
60108
}

tests/specs/async/ExecutorServicesSpec.cfc

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,56 @@ component extends="BaseAsyncSpec" {
1515
var executor = asyncManager.newExecutor( "unitTest" );
1616
expect( executor.getName() ).toBe( "unitTest" );
1717
expect( executor.getCorePoolSize() ).toBe( 20 );
18+
expect( executor.getStats() ).toBeStruct();
1819
} );
1920
it( "can create the default fixed executor with custom threads", function(){
2021
var executor = asyncManager.newExecutor( name: "unitTest", threads: 100 );
2122
expect( executor.getName() ).toBe( "unitTest" );
2223
expect( executor.getCorePoolSize() ).toBe( 100 );
24+
expect( executor.getStats() ).toBeStruct();
2325
} );
2426
it( "can create the a single executor", function(){
2527
var executor = asyncManager.newExecutor( name: "unitTest", type: "single" );
2628
expect( executor.getName() ).toBe( "unitTest" );
2729
expect( executor.getCorePoolSize() ).toBe( 1 );
30+
expect( executor.getStats() ).toBeStruct();
2831
} );
2932
it( "can create the a cached executor", function(){
3033
var executor = asyncManager.newExecutor( name: "unitTest", type: "cached" );
3134
expect( executor.getName() ).toBe( "unitTest" );
3235
expect( executor.getPoolSize() ).toBe( 0 );
36+
expect( executor.getStats() ).toBeStruct();
37+
} );
38+
it( "can create a fork_join executor", function(){
39+
var executor = asyncManager.newExecutor( name: "fork_join", type: "fork_join" );
40+
expect( executor.getName() ).toBe( "fork_join" );
41+
expect( executor.getPoolSize() ).toBe( 0 );
42+
expect( executor.getStats() ).toBeStruct();
43+
} );
44+
it( "can create a work_stealing executor", function(){
45+
var executor = asyncManager.newExecutor( name: "work_stealing", type: "work_stealing" );
46+
expect( executor.getName() ).toBe( "work_stealing" );
47+
expect( executor.getPoolSize() ).toBe( 0 );
48+
expect( executor.getStats() ).toBeStruct();
3349
} );
3450
it( "can create a scheduled executor", function(){
3551
var executor = asyncManager.newExecutor( name: "unitTest", type: "scheduled" );
3652
expect( executor.getName() ).toBe( "unitTest" );
3753
expect( executor.getCorePoolSize() ).toBe( 20 );
3854
expect( executor.getNative().toString() ).toInclude( "ScheduledThreadPoolExecutor" );
39-
} );
55+
expect( executor.getStats() ).toBeStruct();
56+
} );
57+
// Skip on Adobe as their dumb reflection does not support virtual threads
58+
it(
59+
title: "can create a virtual thread executor",
60+
skip: ( server.keyExists( "coldfusion" ) && server.coldfusion.productName.findNoCase( "ColdFusion" ) ),
61+
body: function(){
62+
var executor = asyncManager.newExecutor( name: "virtual", type: "virtual" );
63+
expect( executor.getName() ).toBe( "virtual" );
64+
expect( executor.getPoolSize() ).toBe( 0 );
65+
expect( executor.getStats() ).toBeStruct();
66+
}
67+
);
4068
it( "can throw an exception if the wrong type is passed", function(){
4169
expect( function(){
4270
asyncManager.newExecutor( name: "unitTest", type: "bogus" );
@@ -117,7 +145,7 @@ component extends="BaseAsyncSpec" {
117145
var executor1 = asyncManager.newExecutor( "unitTest1" );
118146
var statusMap = asyncManager.getExecutorStatusMap( "unitTest1" );
119147

120-
// debug( statusMap );
148+
debug( statusMap );
121149
expect( statusMap.isShutdown ).toBeFalse();
122150
} );
123151
} );

0 commit comments

Comments
 (0)