1- import { TaskQueueType } from "@trigger.dev/database" ;
1+ import type { RunEngine } from "@internal/run-engine" ;
2+ import { Prisma , TaskQueueType } from "@trigger.dev/database" ;
3+ import { type PrismaClientOrTransaction } from "~/db.server" ;
24import { type AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
35import { determineEngineVersion } from "~/v3/engineVersion.server" ;
46import { engine } from "~/v3/runEngine.server" ;
57import { BasePresenter } from "./basePresenter.server" ;
68import { toQueueItem } from "./QueueRetrievePresenter.server" ;
79
8- const DEFAULT_ITEMS_PER_PAGE = 25 ;
10+ type QueueListEngine = Pick < RunEngine , "lengthOfQueues" | "currentConcurrencyOfQueues" > ;
11+
12+ export const QUEUE_LIST_DEFAULT_ITEMS_PER_PAGE = 25 ;
913const MAX_ITEMS_PER_PAGE = 100 ;
1014
1115const typeToDBQueueType : Record < "task" | "custom" , TaskQueueType > = {
1216 task : TaskQueueType . VIRTUAL ,
1317 custom : TaskQueueType . NAMED ,
1418} ;
1519
20+ export type QueueListFilteredPagination = {
21+ mode : "filtered" ;
22+ currentPage : number ;
23+ hasMore : boolean ;
24+ } ;
25+
26+ export type QueueListUnfilteredPagination = {
27+ mode : "unfiltered" ;
28+ currentPage : number ;
29+ totalPages : number ;
30+ count : number ;
31+ } ;
32+
33+ export type QueueListPagination = QueueListFilteredPagination | QueueListUnfilteredPagination ;
34+
35+ export type OffsetLimitPagination = {
36+ currentPage : number ;
37+ totalPages : number ;
38+ count : number ;
39+ } ;
40+
41+ /** Maps presenter pagination to the public API / SDK offset-limit contract. */
42+ export function toOffsetLimitQueueListPagination (
43+ pagination : QueueListPagination ,
44+ options : { itemsOnPage : number ; perPage : number }
45+ ) : OffsetLimitPagination {
46+ if ( pagination . mode === "unfiltered" ) {
47+ return {
48+ currentPage : pagination . currentPage ,
49+ totalPages : pagination . totalPages ,
50+ count : pagination . count ,
51+ } ;
52+ }
53+
54+ return {
55+ currentPage : pagination . currentPage ,
56+ totalPages : pagination . hasMore ? pagination . currentPage + 1 : pagination . currentPage ,
57+ count :
58+ ( pagination . currentPage - 1 ) * options . perPage +
59+ options . itemsOnPage +
60+ ( pagination . hasMore ? 1 : 0 ) ,
61+ } ;
62+ }
63+
64+ const queueListSelect = {
65+ friendlyId : true ,
66+ name : true ,
67+ orderableName : true ,
68+ concurrencyLimit : true ,
69+ concurrencyLimitBase : true ,
70+ concurrencyLimitOverriddenAt : true ,
71+ concurrencyLimitOverriddenBy : true ,
72+ type : true ,
73+ paused : true ,
74+ } satisfies Prisma . TaskQueueSelect ;
75+
76+ function buildQueueListWhere (
77+ environmentId : string ,
78+ query : string | undefined ,
79+ type : "task" | "custom" | undefined
80+ ) : Prisma . TaskQueueWhereInput {
81+ const trimmedQuery = query ?. trim ( ) ;
82+
83+ return {
84+ runtimeEnvironmentId : environmentId ,
85+ version : "V2" ,
86+ name : trimmedQuery
87+ ? {
88+ contains : trimmedQuery ,
89+ mode : "insensitive" ,
90+ }
91+ : undefined ,
92+ type : type ? typeToDBQueueType [ type ] : undefined ,
93+ } ;
94+ }
95+
1696export class QueueListPresenter extends BasePresenter {
1797 private readonly perPage : number ;
98+ private readonly engineClient : QueueListEngine ;
1899
19- constructor ( perPage : number = DEFAULT_ITEMS_PER_PAGE ) {
20- super ( ) ;
100+ constructor (
101+ perPage : number = QUEUE_LIST_DEFAULT_ITEMS_PER_PAGE ,
102+ prismaClient ?: PrismaClientOrTransaction ,
103+ replicaClient ?: PrismaClientOrTransaction ,
104+ engineClient : QueueListEngine = engine
105+ ) {
106+ super ( prismaClient , replicaClient ) ;
21107 this . perPage = Math . min ( perPage , MAX_ITEMS_PER_PAGE ) ;
108+ this . engineClient = engineClient ;
22109 }
23110
24111 public async call ( {
@@ -33,26 +120,14 @@ export class QueueListPresenter extends BasePresenter {
33120 perPage ?: number ;
34121 type ?: "task" | "custom" ;
35122 } ) {
36- const hasFilters = ( query !== undefined && query . length > 0 ) || type !== undefined ;
37-
38- // Get total count for pagination
39- const totalQueues = await this . _replica . taskQueue . count ( {
40- where : {
41- runtimeEnvironmentId : environment . id ,
42- version : "V2" ,
43- name : query
44- ? {
45- contains : query ,
46- mode : "insensitive" ,
47- }
48- : undefined ,
49- type : type ? typeToDBQueueType [ type ] : undefined ,
50- } ,
51- } ) ;
123+ const hasFilters = Boolean ( query ?. trim ( ) ) || type !== undefined ;
52124
53- //check the engine is the correct version
54125 const engineVersion = await determineEngineVersion ( { environment } ) ;
55126 if ( engineVersion === "V1" ) {
127+ const totalQueues = await this . _replica . taskQueue . count ( {
128+ where : buildQueueListWhere ( environment . id , query , type ) ,
129+ } ) ;
130+
56131 if ( totalQueues === 0 ) {
57132 const oldQueue = await this . _replica . taskQueue . findFirst ( {
58133 where : {
@@ -78,10 +153,30 @@ export class QueueListPresenter extends BasePresenter {
78153 } ;
79154 }
80155
156+ if ( hasFilters ) {
157+ const { queues, hasMore } = await this . getFilteredQueues ( environment , query , page , type ) ;
158+
159+ return {
160+ success : true as const ,
161+ queues,
162+ pagination : {
163+ mode : "filtered" as const ,
164+ currentPage : page ,
165+ hasMore,
166+ } ,
167+ hasFilters,
168+ } ;
169+ }
170+
171+ const totalQueues = await this . _replica . taskQueue . count ( {
172+ where : buildQueueListWhere ( environment . id , query , type ) ,
173+ } ) ;
174+
81175 return {
82176 success : true as const ,
83- queues : await this . getQueuesWithPagination ( environment , query , page , type ) ,
177+ queues : await this . getUnfilteredQueues ( environment , page , type ) ,
84178 pagination : {
179+ mode : "unfiltered" as const ,
85180 currentPage : page ,
86181 totalPages : Math . ceil ( totalQueues / this . perPage ) ,
87182 count : totalQueues ,
@@ -91,48 +186,68 @@ export class QueueListPresenter extends BasePresenter {
91186 } ;
92187 }
93188
94- private async getQueuesWithPagination (
189+ private async getFilteredQueues (
95190 environment : AuthenticatedEnvironment ,
96191 query : string | undefined ,
97192 page : number ,
98193 type : "task" | "custom" | undefined
99194 ) {
100195 const queues = await this . _replica . taskQueue . findMany ( {
101- where : {
102- runtimeEnvironmentId : environment . id ,
103- version : "V2" ,
104- name : query
105- ? {
106- contains : query ,
107- mode : "insensitive" ,
108- }
109- : undefined ,
110- type : type ? typeToDBQueueType [ type ] : undefined ,
111- } ,
112- select : {
113- friendlyId : true ,
114- name : true ,
115- orderableName : true ,
116- concurrencyLimit : true ,
117- concurrencyLimitBase : true ,
118- concurrencyLimitOverriddenAt : true ,
119- concurrencyLimitOverriddenBy : true ,
120- type : true ,
121- paused : true ,
196+ where : buildQueueListWhere ( environment . id , query , type ) ,
197+ select : queueListSelect ,
198+ orderBy : {
199+ orderableName : "asc" ,
122200 } ,
201+ skip : ( page - 1 ) * this . perPage ,
202+ take : this . perPage + 1 ,
203+ } ) ;
204+
205+ const hasMore = queues . length > this . perPage ;
206+
207+ return {
208+ queues : await this . enrichQueues ( environment , queues . slice ( 0 , this . perPage ) ) ,
209+ hasMore,
210+ } ;
211+ }
212+
213+ private async getUnfilteredQueues (
214+ environment : AuthenticatedEnvironment ,
215+ page : number ,
216+ type : "task" | "custom" | undefined
217+ ) {
218+ const queues = await this . _replica . taskQueue . findMany ( {
219+ where : buildQueueListWhere ( environment . id , undefined , type ) ,
220+ select : queueListSelect ,
123221 orderBy : {
124222 orderableName : "asc" ,
125223 } ,
126224 skip : ( page - 1 ) * this . perPage ,
127225 take : this . perPage ,
128226 } ) ;
129227
228+ return this . enrichQueues ( environment , queues ) ;
229+ }
230+
231+ private async enrichQueues (
232+ environment : AuthenticatedEnvironment ,
233+ queues : {
234+ friendlyId : string ;
235+ name : string ;
236+ orderableName : string | null ;
237+ concurrencyLimit : number | null ;
238+ concurrencyLimitBase : number | null ;
239+ concurrencyLimitOverriddenAt : Date | null ;
240+ concurrencyLimitOverriddenBy : string | null ;
241+ type : TaskQueueType ;
242+ paused : boolean ;
243+ } [ ]
244+ ) {
130245 const results = await Promise . all ( [
131- engine . lengthOfQueues (
246+ this . engineClient . lengthOfQueues (
132247 environment ,
133248 queues . map ( ( q ) => q . name )
134249 ) ,
135- engine . currentConcurrencyOfQueues (
250+ this . engineClient . currentConcurrencyOfQueues (
136251 environment ,
137252 queues . map ( ( q ) => q . name )
138253 ) ,
@@ -149,7 +264,6 @@ export class QueueListPresenter extends BasePresenter {
149264
150265 const overriddenByMap = new Map ( overriddenByUsers . map ( ( u ) => [ u . id , u ] ) ) ;
151266
152- // Transform queues to include running and queued counts
153267 return queues . map ( ( queue ) =>
154268 toQueueItem ( {
155269 friendlyId : queue . friendlyId ,
0 commit comments