Skip to content

Commit cabf53f

Browse files
authored
Merge pull request #7008 from FlowFuse/expert/pubsub
Expert pubsub backend
2 parents c3a7fcb + 7cc82bd commit cabf53f

12 files changed

Lines changed: 1084 additions & 4 deletions

File tree

forge/comms/aclManager.js

Lines changed: 196 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,25 @@
77
* Other components (ie EE-specific features) can register their own additional ACLs
88
*/
99
module.exports = function (app) {
10+
const expertRbacToolCheck = async (teamMembership, application, toolName) => {
11+
const applicationHash = typeof application === 'object' ? application.hashid : application
12+
if (toolName === 'expert:status-message') {
13+
return true
14+
}
15+
// TODO: Understand all automations and which permissions they should require.
16+
// For now, basic starter automations are added here, any not matching this list will require project:flows:edit permission
17+
const toolAccessPermission = {
18+
'automation:select-nodes': 'project:flows:view',
19+
'automation:get-nodes': 'project:flows:view',
20+
'automation:get-flows': 'project:flows:view'
21+
}
22+
const requiredPermission = toolAccessPermission[toolName] || 'project:flows:edit' // default to highest level of access if tool isn't in the list, to be safe
23+
if (!app.hasPermission(teamMembership, requiredPermission, { applicationId: applicationHash })) {
24+
return false
25+
}
26+
return true
27+
}
28+
1029
// Standard set of verify functions to ensure the request meets particular criteria
1130
const verifyFunctions = {
1231
checkTeamAndObjectIds: async function (requestParts, ids) {
@@ -95,6 +114,142 @@ module.exports = function (app) {
95114
// postgres with throw over, unlike sqlite that returns no results.
96115
return false
97116
}
117+
},
118+
checkExpertTopic: async function (topicParts, usernameParts, acl) {
119+
// topicParts = [ fullTopic , <userid>, <sessionid>, <entityType>, <entityId> [, <inflightType>] ]
120+
// usernameParts = [ 'expert-client' | 'expert-agent', <userid> [, <sessionid>] ]
121+
// acl = { channel: 'inflight'|'chat', isPub: true/false, isSub: true/false, isClient: true/false, isAgent: true/false, allowWildcard: { user: true/false, session: true/false, entity: true/false } }
122+
123+
const ValidationError = function (message) {
124+
const error = new Error(message)
125+
error.name = 'ACLValidationError'
126+
return error
127+
}
128+
129+
try {
130+
const [, userId, sessionId, entityType, entityId, inflightType] = topicParts
131+
const [clientType, usernameUserId, usernameSessionId] = usernameParts
132+
133+
const isInflight = acl.channel === 'inflight'
134+
const isChat = acl.channel === 'chat'
135+
const validateUserMatch = acl.isClient
136+
const validateUserExists = acl.isClient || (acl.isAgent && acl.isPub)
137+
const validateSession = acl.isClient
138+
139+
// ensure correct selected acl for the client
140+
if (acl.isAgent && clientType !== 'expert-agent') {
141+
throw ValidationError('invalid client type - expected an expert-agent client')
142+
} else if (!acl.isAgent && clientType !== 'expert-client') {
143+
throw ValidationError('invalid client type - expected an expert-client client')
144+
}
145+
146+
// ensure topic part count is valid for the type of subscription
147+
if (isInflight && topicParts.length !== 6) {
148+
throw ValidationError('topic is invalid')
149+
} else if (isChat && topicParts.length !== 5) {
150+
throw ValidationError('topic is invalid')
151+
}
152+
153+
if (!userId) {
154+
throw ValidationError('invalid userId')
155+
}
156+
if (validateUserMatch && userId !== usernameUserId) {
157+
throw ValidationError('userId does not match')
158+
}
159+
if (acl.allowWildcard?.user && userId === '+') {
160+
// this is valid, the client is subscribing to all topics for this session
161+
} else if (validateUserExists) {
162+
const user = await app.db.models.User.byId(userId)
163+
if (!user) {
164+
throw ValidationError('userId does not exist')
165+
}
166+
}
167+
168+
// at minimum, ensure session is present 8 or more chars
169+
if (!sessionId) {
170+
throw ValidationError('invalid sessionId')
171+
} else {
172+
if (acl.allowWildcard?.session && sessionId === '+') {
173+
// this is valid for agent subs (as they service all sessions for the users)
174+
} else if (validateSession && sessionId !== usernameSessionId) {
175+
throw ValidationError('sessionId does not match')
176+
} else if (sessionId.length < 8) {
177+
throw ValidationError('invalid sessionId')
178+
}
179+
}
180+
181+
// not all inflight subscriptions require an entity, but if one is provided it must be valid
182+
let teamId
183+
let applicationHash
184+
let isWildcardEntity = false
185+
if (acl.allowWildcard?.entity && (entityType === '+' || entityId === '+')) {
186+
if (entityType !== '+' && entityId !== '+') {
187+
throw ValidationError('invalid entity wildcards - both entityType and entityId must be wildcarded together')
188+
}
189+
isWildcardEntity = true
190+
} else if (entityType === 'p') {
191+
const project = await app.db.models.Project.byId(entityId)
192+
if (!project) {
193+
throw ValidationError('project does not exist')
194+
} else {
195+
teamId = project.TeamId
196+
applicationHash = project.Application?.hashid || app.db.models.Application.encodeHashid(project.ApplicationId)
197+
}
198+
} else if (entityType === 'd') {
199+
const device = await app.db.models.Device.byId(entityId)
200+
if (!device) {
201+
throw ValidationError('device does not exist')
202+
} else {
203+
teamId = device.TeamId
204+
applicationHash = device.Application?.hashid || app.db.models.Application.encodeHashid(device.ApplicationId)
205+
}
206+
} else if (entityType === 'a') {
207+
const application = await app.db.models.Application.byId(entityId)
208+
if (!application) {
209+
throw ValidationError('application does not exist')
210+
} else {
211+
teamId = application.TeamId
212+
applicationHash = application.hashid
213+
}
214+
} else if (entityType === 't') {
215+
const team = await app.db.models.Team.byId(entityId)
216+
if (!team) {
217+
throw ValidationError('team does not exist')
218+
} else {
219+
teamId = team.id
220+
applicationHash = null // NA
221+
}
222+
} else {
223+
throw ValidationError('invalid entity')
224+
}
225+
226+
// must be member of a team
227+
if (!isWildcardEntity) {
228+
const teamMembership = await app.db.models.TeamMember.getTeamMembership(userId, teamId, false)
229+
if (!teamMembership) {
230+
throw ValidationError('user is not a member of the team that owns this project')
231+
}
232+
233+
// if this is an inflight channel messages we must validate the user has appropriate RBAC permission
234+
if (isInflight) {
235+
const result = await expertRbacToolCheck(teamMembership, applicationHash, inflightType)
236+
if (!result) {
237+
throw ValidationError('user does not have permission to access this inflight topic')
238+
}
239+
}
240+
}
241+
242+
return true
243+
} catch (error) {
244+
if (error.name === 'ACLValidationError') {
245+
// ↓ Useful for debugging ↓
246+
// console.warn('ACL DENY:', { topicParts, usernameParts, acl, reason: error.message })
247+
} else {
248+
// unexpected error during ACL checking - log to app
249+
app.log.error('Unexpected error during ACL check', { topicParts, usernameParts, acl, error })
250+
}
251+
return false
252+
}
98253
}
99254
}
100255

@@ -191,16 +346,46 @@ module.exports = function (app) {
191346
// - ff/v1/<team>/d/<device/resources/heartbeat
192347
{ topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/resources\/heartbeat$/, verify: 'checkDeviceIsAssigned' }
193348
]
349+
},
350+
// frontend client (user)
351+
expertClient: {
352+
sub: [
353+
// topic: ff/v1/expert/<userid>/<sessionid>/<a|p|d|t>/<appid|projid|devid|teamid>/support/chat/response
354+
// topic captures, 0 = full topic, 1 = userid, 2 = sessionid, 3 = entity type (a|p|d|t), 4 = entity id, 5 = inflight type (only for inflight topics)
355+
// example topic: ff/v1/expert/user123/session123/p/abc-123-456-789/support/chat/response
356+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { entity: true }, isClient: true, isSub: true },
357+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { entity: true, inflightType: true }, isClient: true, isSub: true }
358+
],
359+
pub: [
360+
// topic: ff/v1/expert/<userid>/<sessionid>/<a|p|d|t>/<appid|projid|devid|teamid>/support/chat/request
361+
// topic captures, 0 = full topic, 1 = userid, 2 = sessionid, 3 = entity type (a|p|d|t), 4 = entity id, 5 = inflight type (only for inflight topics)
362+
// example topic: ff/v1/expert/user123/session123/p/abc-111-222-333/support/chat/request
363+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([tapd])\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', isClient: true, isPub: true },
364+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([tapd])\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', isClient: true, isPub: true }
365+
]
366+
},
367+
// backend client (agent)
368+
expertAgent: {
369+
sub: [
370+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { user: true, session: true, entity: true }, isAgent: true, isSub: true },
371+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { user: true, session: true, entity: true, inflightType: true }, isAgent: true, isSub: true }
372+
],
373+
pub: [
374+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', isAgent: true, isPub: true },
375+
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', isAgent: true, isPub: true }
376+
]
194377
}
195378
}
196379

197380
return {
198381
verify: async function (username, topic, accessLevel) {
199-
// Four types of client
382+
// Types of client
200383
// - forge_platform
201384
// - project:<teamid>:<projectid>
202385
// - device:<teamid>:<deviceid>
203386
// - frontend:<teamid>:<deviceid>
387+
// - expert-client:<userid>:<sessionid>
388+
// - expert-agent:<userid>:<apiversion>
204389

205390
let allowed = false
206391
let aclList = []
@@ -216,6 +401,10 @@ module.exports = function (app) {
216401
aclList = ACLS.device[aclType]
217402
} else if (/^frontend:/.test(username)) {
218403
aclList = ACLS.frontend[aclType]
404+
} else if (/^expert-agent:/.test(username)) {
405+
aclList = ACLS.expertAgent[aclType]
406+
} else if (/^expert-client:/.test(username)) {
407+
aclList = ACLS.expertClient[aclType]
219408
} else {
220409
return false
221410
}
@@ -244,7 +433,12 @@ module.exports = function (app) {
244433
// This isn't allowed to be a sharedSub
245434
break
246435
} else if (acl.verify && verifyFunctions[acl.verify]) {
247-
allowed = await verifyFunctions[acl.verify](m, usernameParts, acl)
436+
try {
437+
allowed = await verifyFunctions[acl.verify](m, usernameParts, acl)
438+
} catch (err) {
439+
allowed = false
440+
app.log.error('Error in ACL verify function', { error: err, topic, username, acl })
441+
}
248442
// ↓ Useful for debugging ↓
249443
// if (allowed !== true) {
250444
// console.log('DENIED!', topic, acl)

forge/comms/v2AuthRoutes.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ module.exports = async function (app) {
3434
if ((username.startsWith('device:') && password.startsWith('ffbd_')) ||
3535
(username.startsWith('project:') && password.startsWith('ffbp_')) ||
3636
(username.startsWith('frontend:') && password.startsWith('ffbf_')) ||
37+
(username.startsWith('expert-agent:') && password.startsWith('ffbea_')) ||
38+
(username.startsWith('expert-client:') && password.startsWith('ffbec_')) ||
3739
(username === 'forge_platform')) {
3840
const isValid = await app.db.controllers.BrokerClient.authenticateCredentials(
3941
username,
@@ -136,6 +138,8 @@ module.exports = async function (app) {
136138
if ((username.startsWith('device:') ||
137139
username.startsWith('project:') ||
138140
username.startsWith('frontend:') ||
141+
username.startsWith('expert-agent:') ||
142+
username.startsWith('expert-client:') ||
139143
username === 'forge_platform') && !username.includes('@')) {
140144
const acc = action === 'subscribe' ? 1 : 2
141145
const allowed = await app.comms.aclManager.verify(username, topic, acc)

forge/db/controllers/BrokerClient.js

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ module.exports = {
1010
attributes: ['username', 'password']
1111
})
1212
if (compareHash(password || '', user ? user.password : '')) {
13-
if (username.startsWith('frontend:')) {
13+
if (username.startsWith('frontend:') || username.startsWith('expert-client:')) {
1414
await user.destroy()
1515
}
1616
return true
@@ -140,5 +140,75 @@ module.exports = {
140140
}
141141
}
142142
return null
143+
},
144+
145+
createClientForExpertAgent: async function (app) {
146+
if (app.comms) {
147+
const username = 'expert-agent:api:v1'
148+
const password = generateToken(32, 'ffbea') // ff broker expert agent
149+
const [client, created] = await app.db.models.BrokerClient.findOrCreate({
150+
where: {
151+
username
152+
},
153+
defaults: {
154+
password,
155+
ownerId: '',
156+
ownerType: 'platform'
157+
}
158+
})
159+
// if it was created, the password is already set. If not, we need to update it with a new one.
160+
if (!created) {
161+
client.password = password
162+
await client.save()
163+
}
164+
await app.settings.set('platform:expert-agent:creds', true)
165+
return {
166+
username,
167+
password
168+
}
169+
}
170+
return null
171+
},
172+
173+
removeClientForExpertAgent: async function (app) {
174+
if (app.comms) {
175+
await app.db.models.BrokerClient.destroy({
176+
where: {
177+
username: 'expert-agent:api:v1'
178+
}
179+
})
180+
await app.settings.set('platform:expert-agent:creds', false)
181+
}
182+
return null
183+
},
184+
185+
createClientForExpertClient: async function (app, user, sessionId) {
186+
if (app.comms) {
187+
const existingClient = await app.db.models.BrokerClient.findOne({
188+
where: {
189+
ownerId: '' + user.id,
190+
ownerType: 'expert-user',
191+
username: `expert-client:${user.hashid}:${sessionId}`
192+
}
193+
})
194+
if (existingClient) {
195+
await existingClient.destroy()
196+
}
197+
198+
const username = `expert-client:${user.hashid}:${sessionId}`
199+
const password = generateToken(32, 'ffbec') // ff broker expert client
200+
await app.db.models.BrokerClient.create({
201+
username,
202+
password,
203+
ownerId: '' + user.id,
204+
ownerType: 'expert-user'
205+
})
206+
return {
207+
url: app.config.broker.public_url || app.config.broker.url || null,
208+
username,
209+
password
210+
}
211+
}
212+
return null
143213
}
144214
}

forge/lib/permissions.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ const Permissions = {
123123
'platform:debug': { description: 'View platform debug information', role: Roles.Admin },
124124
'platform:stats': { description: 'View platform stats information', role: Roles.Admin },
125125
'platform:stats:token': { description: 'Create/Delete platform stats token', role: Roles.Admin },
126+
'platform:expert-agent:creds': { description: 'Create/Delete expert agent credentials', role: Roles.Admin },
126127
'platform:audit-log': { description: 'View platform audit log', role: Roles.Admin },
127128

128129
/**

0 commit comments

Comments
 (0)