Skip to content

Commit 550189c

Browse files
authored
Merge pull request #6144 from FlowFuse/agent-topic-upload-cache
Add caching to MQTT Agent Topic insertion
2 parents f5a397c + f0548e0 commit 550189c

4 files changed

Lines changed: 139 additions & 16 deletions

File tree

forge/ee/lib/teamBroker/index.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ module.exports.init = async function (app) {
4141
*/
4242
async function addUsedTopic (topic, team) {
4343
const teamId = app.db.models.Team.decodeHashid(team)
44-
const cacheHit = topicCache.get(`${teamId}#${topic}`)
44+
const cacheHit = topicCache.get(`${teamId[0]}#${topic}`)
4545
if (!cacheHit) {
4646
await app.db.models.MQTTTopicSchema.upsert({
4747
topic,
@@ -76,7 +76,13 @@ module.exports.init = async function (app) {
7676
app.log.debug(`Error populating Team Broker Topic Cache ${err.toString()}`)
7777
}
7878

79+
function removeTopicFromCache (topic, team) {
80+
const teamId = app.db.models.Team.decodeHashid(team)
81+
topicCache.delete(`${teamId[0]}#${topic.topic}`)
82+
}
83+
7984
app.decorate('teamBroker', {
80-
addUsedTopic
85+
addUsedTopic,
86+
removeTopicFromCache
8187
})
8288
}

forge/ee/routes/teamBroker/3rdPartyBroker.js

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
const { LRUCache } = require('lru-cache')
2+
13
module.exports = async function (app) {
24
app.addHook('preHandler', async (request, reply) => {
35
if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) {
@@ -545,6 +547,12 @@ module.exports = async function (app) {
545547
reply.send(clean)
546548
})
547549

550+
// set up topic cache
551+
const topicCache = new LRUCache({
552+
max: 5000,
553+
ttl: 1000 * 60 * 30 // 30 min cache life
554+
})
555+
548556
/**
549557
* Store Topics from a 3rd Party Broker
550558
* @name /api/v1/teams/:teamId/brokers/:brokerId/topics
@@ -626,10 +634,14 @@ module.exports = async function (app) {
626634
topicObj.metadata = topicInfo.metadata
627635
}
628636
try {
629-
await app.db.models.MQTTTopicSchema.upsert(topicObj, {
630-
fields: ['inferredSchema', 'metadata'],
631-
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
632-
})
637+
const cacheHit = topicCache.get(`${teamId}#${brokerId}#${topicInfo.topic}`)
638+
if (!cacheHit) {
639+
await app.db.models.MQTTTopicSchema.upsert(topicObj, {
640+
fields: ['inferredSchema', 'metadata'],
641+
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
642+
})
643+
topicCache.set(`${teamId}#${brokerId}#${topicInfo.topic}`, true)
644+
}
633645
} catch (err) {
634646
// reply.status(500).send({ error: 'unknown_erorr', message: err.toString() })
635647
// return
@@ -727,12 +739,21 @@ module.exports = async function (app) {
727739
}
728740
}, async (request, reply) => {
729741
let brokerId = request.params.brokerId
742+
let teamBroker = false
730743
if (brokerId === 'team-broker') {
744+
teamBroker = true
731745
brokerId = app.settings.get('team:broker:creds')
732746
}
733747
const topic = await app.db.models.MQTTTopicSchema.get(request.params.teamId, brokerId, request.params.topicId)
734748
if (topic) {
735749
await topic.destroy()
750+
if (teamBroker) {
751+
app.teamBroker.removeTopicFromCache(topic, request.params.teamId)
752+
} else {
753+
const team = app.db.models.Team.decodeHashid(request.params.teamId)[0]
754+
const broker = brokerId = app.db.models.BrokerCredentials.decodeHashid(request.params.brokerId)[0]
755+
topicCache.delete(`${team}#${broker}#${topic.topic}`)
756+
}
736757
reply.status(201).send({})
737758
} else {
738759
reply.status(404).send({ code: 'not_found', error: 'not found' })

test/unit/forge/comms/authRoutesV2_spec.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,11 @@ describe('Broker Auth v2 API', async function () {
600600
})
601601
})
602602
describe('Team Broker Topic Update Cache', async function () {
603+
async function sleep (seconds) {
604+
return new Promise((resolve) => {
605+
setTimeout(resolve, (1000 * 3))
606+
})
607+
}
603608
it('should not update if multiple calls to the same topic', async function () {
604609
const topic = 'update/topic/timestamp'
605610
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
@@ -618,6 +623,26 @@ describe('Broker Auth v2 API', async function () {
618623
})
619624
secondTopic[0].updatedAt.toISOString().should.equal(firstTopic[0].updatedAt.toISOString())
620625
})
626+
it('should delete from cache', async function () {
627+
const topic = 'update/topic/timestamp/2'
628+
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
629+
const firstTopic = await app.db.models.MQTTTopicSchema.findAll({
630+
where: {
631+
topic,
632+
TeamId: TestObjects.ATeam.id
633+
}
634+
})
635+
app.teamBroker.removeTopicFromCache(firstTopic[0], TestObjects.ATeam.hashid)
636+
await sleep(3)
637+
await app.teamBroker.addUsedTopic(topic, TestObjects.ATeam.hashid)
638+
const secondTopic = await app.db.models.MQTTTopicSchema.findAll({
639+
where: {
640+
topic,
641+
TeamId: TestObjects.ATeam.id
642+
}
643+
})
644+
secondTopic[0].updatedAt.toISOString().should.not.equal(firstTopic[0].updatedAt.toISOString())
645+
})
621646
})
622647
})
623648
})

test/unit/forge/ee/routes/teamBroker/3rdPartyBroker_spec.js

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ describe('3rd Party Broker API', function () {
344344
cookies: { sid: TestObjects.tokens.bob },
345345
body: [
346346
{
347-
topic: 'bar/baz/qux',
347+
topic: 'bar/baz/qux/x',
348348
metadata: { description: 'a topic' }
349349
}
350350
]
@@ -359,18 +359,21 @@ describe('3rd Party Broker API', function () {
359359
})
360360
response.statusCode.should.equal(200)
361361
const result = response.json()
362-
result.topics.should.have.a.lengthOf(3)
362+
result.topics.should.have.a.lengthOf(4)
363363
const topics = result.topics
364364
topics.sort((A, B) => A.topic.localeCompare(B.topic))
365365

366366
topics[0].should.have.property('topic', 'bar/baz/qux')
367-
topics[0].should.have.property('metadata', { description: 'a topic' })
367+
topics[0].should.have.property('metadata', {})
368368

369-
topics[1].should.have.property('topic', 'foo/bar/baz')
370-
topics[1].should.have.property('metadata', {})
369+
topics[1].should.have.property('topic', 'bar/baz/qux/x')
370+
topics[1].should.have.property('metadata', { description: 'a topic' })
371371

372-
topics[2].should.have.property('topic', 'foo/bar/baz/qux')
372+
topics[2].should.have.property('topic', 'foo/bar/baz')
373373
topics[2].should.have.property('metadata', {})
374+
375+
topics[3].should.have.property('topic', 'foo/bar/baz/qux')
376+
topics[3].should.have.property('metadata', {})
374377
})
375378
it('Get Topics for 3rd Pary broker as a Team Owner', async function () {
376379
const response = await app.inject({
@@ -380,7 +383,7 @@ describe('3rd Party Broker API', function () {
380383
})
381384
response.statusCode.should.equal(200)
382385
const result = response.json()
383-
result.topics.should.have.a.lengthOf(3)
386+
result.topics.should.have.a.lengthOf(4)
384387
})
385388
it('Add Metadata to a Topic', async function () {
386389
let response = await app.inject({
@@ -390,7 +393,7 @@ describe('3rd Party Broker API', function () {
390393
})
391394
response.statusCode.should.equal(200)
392395
let result = response.json()
393-
result.topics.should.have.a.lengthOf(3)
396+
result.topics.should.have.a.lengthOf(4)
394397
result.topics[0].should.have.property('id')
395398
result.topics[0].should.have.property('topic')
396399
const topicId = result.topics[0].id
@@ -420,7 +423,7 @@ describe('3rd Party Broker API', function () {
420423
})
421424
response.statusCode.should.equal(200)
422425
let result = response.json()
423-
result.topics.should.have.a.lengthOf(3)
426+
result.topics.should.have.a.lengthOf(4)
424427
result.topics[0].should.have.property('id')
425428
result.topics[0].should.have.property('topic')
426429
const topicId = result.topics[0].id
@@ -439,8 +442,76 @@ describe('3rd Party Broker API', function () {
439442
})
440443
response.statusCode.should.equal(200)
441444
result = response.json()
442-
result.count.should.equal(2)
445+
result.count.should.equal(3)
443446
})
447+
448+
it('Topic Cache', async function () {
449+
const response = await app.inject({
450+
method: 'POST',
451+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
452+
cookies: { sid: TestObjects.tokens.bob },
453+
body: [
454+
{
455+
topic: 'bar/baz/qux',
456+
metadata: { description: 'a topic' }
457+
}
458+
]
459+
})
460+
response.statusCode.should.equal(201)
461+
462+
const responseTopics = await app.inject({
463+
method: 'GET',
464+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
465+
cookies: { sid: TestObjects.tokens.bob }
466+
})
467+
const result = responseTopics.json()
468+
469+
const topic = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id)
470+
await app.inject({
471+
method: 'POST',
472+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
473+
cookies: { sid: TestObjects.tokens.bob },
474+
body: [
475+
{
476+
topic: 'bar/baz/qux',
477+
metadata: { description: 'a topic' }
478+
}
479+
]
480+
})
481+
482+
const topicSecond = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result.topics[0].id)
483+
484+
topicSecond.updatedAt.toISOString().should.equal(topic.updatedAt.toISOString())
485+
const response2 = await app.inject({
486+
method: 'DELETE',
487+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics/${result.topics[0].id}`,
488+
cookies: { sid: TestObjects.tokens.bob }
489+
})
490+
response2.statusCode.should.equal(201)
491+
492+
await app.inject({
493+
method: 'POST',
494+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
495+
cookies: { sid: TestObjects.tokens.bob },
496+
body: [
497+
{
498+
topic: 'bar/baz/qux',
499+
metadata: { description: 'a topic' }
500+
}
501+
]
502+
})
503+
504+
const responseTopics2 = await app.inject({
505+
method: 'GET',
506+
url: `/api/v1/teams/${app.team.hashid}/brokers/${brokerCredentialId}/topics`,
507+
cookies: { sid: TestObjects.tokens.bob }
508+
})
509+
const result2 = responseTopics2.json()
510+
511+
const topicThird = await app.db.models.MQTTTopicSchema.get(app.team.hashid, brokerCredentialId, result2.topics[0].id)
512+
topicThird.updatedAt.toISOString().should.not.equal(topic.updatedAt.toISOString())
513+
})
514+
444515
describe('Team Broker', function () {
445516
before(async function () {
446517
app.team2 = await app.factory.createTeam({ name: 'BTeam' })

0 commit comments

Comments
 (0)