Skip to content
2 changes: 1 addition & 1 deletion app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ function createApp(config: AppConfig): App {
const harvestQueue = config.harvest.queue()
initializers.push(async () => harvestQueue.initialize())

const recomputeHandler = config.upgrade.service({ queue: config.upgrade.queue })
const recomputeHandler = config.recompute.service({ queue: config.recompute.queue })
initializers.push(async () => recomputeHandler.initialize())

const definitionService = createDefinitionService(
Expand Down
14 changes: 10 additions & 4 deletions bin/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export interface AppConfig {
definition: {
store: () => DefinitionStore & Initializable
}
upgrade: {
recompute: {
queue: RecomputeQueueFactories<IQueue & Initializable>
service: (options: { queue: RecomputeQueueFactories<IQueue & Initializable> }) => RecomputeHandler & Initializable
}
Expand Down Expand Up @@ -173,9 +173,15 @@ export default {
definition: {
store: loadFactory(config.get('DEFINITION_STORE_PROVIDER') || 'file', 'definition')
},
upgrade: {
queue: loadFactory(config.get('DEFINITION_UPGRADE_QUEUE_PROVIDER') || 'memory', 'upgrade.queue'),
service: loadFactory(config.get('DEFINITION_UPGRADE_PROVIDER') || 'versionCheck', 'upgrade.service')
recompute: {
queue: loadFactory(
config.get('DEFINITION_RECOMPUTE_QUEUE_PROVIDER') || config.get('DEFINITION_UPGRADE_QUEUE_PROVIDER') || 'memory',
'recompute.queue'
),
service: loadFactory(
config.get('DEFINITION_RECOMPUTE_PROVIDER') || config.get('DEFINITION_UPGRADE_PROVIDER') || 'onDemand',
'recompute.service'
)
},
attachment: {
store: loadFactory(config.get('ATTACHMENT_STORE_PROVIDER') || 'file', 'attachment')
Expand Down
3 changes: 2 additions & 1 deletion business/definitionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,14 @@ export class DefinitionService {
* Invalidate the definition for the identified component. This flushes any caches and pre-computed
* results. The definition will be recomputed on or before the next use.
*/
invalidate(coordinates: EntityCoordinates | EntityCoordinates[]): Promise<void[]> {
invalidate(coordinates: EntityCoordinates | EntityCoordinates[]): Promise<undefined[]> {
const coordinateList = Array.isArray(coordinates) ? coordinates : [coordinates]
return Promise.all(
coordinateList.map(
throat(10, async (coordinates: EntityCoordinates) => {
await this.definitionStore.delete(coordinates)
await this.cache.delete(this._getCacheKey(coordinates))
return undefined
})
)
)
Expand Down
30 changes: 15 additions & 15 deletions providers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import crawlerConfig from '../providers/harvest/crawlerConfig.ts'
import crawlerQueueConfig from '../providers/harvest/crawlerQueueConfig.ts'
import winstonConfig from '../providers/logging/winstonConfig.ts'
import memoryQueue from '../providers/queueing/memoryQueue.ts'
import recomputeAzureQueueConfig from '../providers/recompute/azureQueueConfig.ts'
import recomputeMemoryQueueConfig from '../providers/recompute/memoryQueueConfig.ts'
import recomputeHandler from '../providers/recompute/recomputeHandler.ts'
import azureSearchConfig from '../providers/search/azureConfig.ts'
import memorySearch from '../providers/search/memory.ts'
import azblobConfig from '../providers/stores/azblobConfig.ts'
import dispatchConfig from '../providers/stores/dispatchConfig.ts'
import fileConfig from '../providers/stores/fileConfig.ts'
import mongoConfig from '../providers/stores/mongoConfig.ts'
import upgradeAzureQueueConfig from '../providers/upgrade/azureQueueConfig.ts'
import upgradeMemoryQueueConfig from '../providers/upgrade/memoryQueueConfig.ts'
import * as recomputeHandler from '../providers/upgrade/recomputeHandler.ts'
import type { ICache } from './caching/index.js'
import listBasedFilterConfig from './harvest/throttling/listBasedFilterConfig.ts'
import type { Logger } from './logging/index.js'
Expand Down Expand Up @@ -72,14 +72,14 @@ export interface RecomputeQueueFactories<T = any> {
compute: ProviderFactory<T>
}

/** Provider configuration for upgrade queue */
export interface UpgradeQueueProviders {
/** Provider configuration for recompute queue */
export interface RecomputeQueueProviders {
azure: RecomputeQueueFactories
memory: RecomputeQueueFactories
}

/** Provider configuration for upgrade service */
export interface UpgradeServiceProviders {
/** Provider configuration for recompute service */
export interface RecomputeServiceProviders {
onDemand: ProviderFactory
/** @deprecated TODO: remove in favor of onDemand */
versionCheck: ProviderFactory
Expand All @@ -88,10 +88,10 @@ export interface UpgradeServiceProviders {
upgradeQueue: ProviderFactory
}

/** Provider configuration for upgrade */
export interface UpgradeProviders {
queue: UpgradeQueueProviders
service: UpgradeServiceProviders
/** Provider configuration for recompute */
export interface RecomputeProviders {
queue: RecomputeQueueProviders
service: RecomputeServiceProviders
}

/** Provider configuration for curation queue */
Expand Down Expand Up @@ -152,7 +152,7 @@ export interface Providers {
search: SearchProviders
caching: CachingProviders
auth: AuthProviders
upgrade: UpgradeProviders
recompute: RecomputeProviders
curation: CurationProviders
harvest: HarvestProviders
}
Expand Down Expand Up @@ -183,10 +183,10 @@ const providers: Providers = {
auth: {
github: githubAuthConfig
},
upgrade: {
recompute: {
queue: {
azure: upgradeAzureQueueConfig,
memory: upgradeMemoryQueueConfig
azure: recomputeAzureQueueConfig,
memory: recomputeMemoryQueueConfig
},
service: {
onDemand: recomputeHandler.defaultFactory,
Expand Down
14 changes: 7 additions & 7 deletions providers/upgrade/process.ts → providers/recompute/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class QueueHandler {
}
}

class DefinitionUpgrader implements MessageHandler {
class DefinitionRecomputer implements MessageHandler {
logger: Logger
declare _definitionService: DefinitionService
declare _upgradePolicy: UpgradeHandler
Expand All @@ -85,12 +85,12 @@ class DefinitionUpgrader implements MessageHandler {
return needsCompute
})
if (result) {
this.logger.info('Handled definition upgrade', { coordinates: coordinates.toString() })
this.logger.info('Handled definition recompute', { coordinates: coordinates.toString() })
} else {
this.logger.debug('Skipped definition upgrade', { coordinates: coordinates.toString() })
this.logger.debug('Skipped definition recompute', { coordinates: coordinates.toString() })
}
} catch (error) {
const context = `Error handling definition upgrade for ${coordinates.toString()}`
const context = `Error handling definition recompute for ${coordinates.toString()}`
const originalError = error instanceof Error ? error : new Error(String(error))
throw new Error(context, { cause: originalError })
}
Expand All @@ -104,9 +104,9 @@ function setup(
once: boolean = false,
_upgradePolicy: UpgradeHandler = factory({ logger: _logger })
): Promise<void> {
const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _upgradePolicy)
const queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
const defRecomputer = new DefinitionRecomputer(_definitionService, _logger, _upgradePolicy)
const queueHandler = new QueueHandler(_queue, _logger, defRecomputer)
return queueHandler.work(once)
}

export { DefinitionUpgrader, QueueHandler, setup }
export { DefinitionRecomputer, QueueHandler, setup }
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function getPolicyName(policy: any): string {
}

/**
* Compatibility alias for DEFINITION_UPGRADE_PROVIDER=versionCheck.
* Factory for DEFINITION_RECOMPUTE_PROVIDER=onDemand (alias: versionCheck).
* Only `logger` is used from options; any other properties (e.g. `queue`) are intentionally ignored.
*/
function defaultFactory({ logger }: Pick<DefinitionVersionCheckerOptions, 'logger'> = {}): RecomputeHandler {
Expand All @@ -118,7 +118,7 @@ function defaultFactory({ logger }: Pick<DefinitionVersionCheckerOptions, 'logge
}

/**
* Compatibility alias for DEFINITION_UPGRADE_PROVIDER=upgradeQueue.
* Factory for DEFINITION_RECOMPUTE_PROVIDER=delayed (alias: upgradeQueue).
*/
function delayedFactory(options: DelayedFactoryOptions = {}): RecomputeHandler {
const shared = {
Expand All @@ -141,3 +141,4 @@ function delayedFactory(options: DelayedFactoryOptions = {}): RecomputeHandler {
}

export { defaultFactory, delayedFactory, RecomputeHandler }
export default { defaultFactory, delayedFactory }
13 changes: 11 additions & 2 deletions test/business/definitionServiceTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import SummaryService from '../../business/summarizer.ts'
import Curation from '../../lib/curation.ts'
import EntityCoordinates from '../../lib/entityCoordinates.ts'
import { setIfValue } from '../../lib/utils.ts'
import memoryQueue from '../../providers/recompute/memoryQueueConfig.ts'
import { defaultFactory, delayedFactory } from '../../providers/recompute/recomputeHandler.ts'
import FileHarvestStore from '../../providers/stores/fileHarvestStore.ts'
import memoryQueue from '../../providers/upgrade/memoryQueueConfig.ts'
import { defaultFactory, delayedFactory } from '../../providers/upgrade/recomputeHandler.ts'
import validator from '../../schemas/validator.ts'
import { createMockLogger, createSilentLogger } from '../helpers/mockLogger.ts'

Expand Down Expand Up @@ -78,6 +78,15 @@ describe('Definition Service', () => {
expect(validator.validate('definition', definition)).to.be.true
})

it('output matches definition returned by get when no tools exist', async () => {
const { service, coordinates } = setup(createDefinition(null, null, null))
const fromGet = await service.get(coordinates)
const fromBuild = service.buildEmptyDefinition(coordinates)
fromGet._meta.updated = 'ignore'
fromBuild._meta.updated = 'ignore'
expect(fromBuild).to.deep.equal(fromGet)
})

it('logs and harvest new definitions with empty tools', async () => {
const { service, coordinates } = setup(createDefinition(null, null, []))
await service.get(coordinates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ chai.use(chaiAsPromised)

import { expect } from 'chai'
import sinon from 'sinon'
import DefinitionQueueUpgrader from '../../../providers/upgrade/defUpgradeQueue.ts'
import MemoryQueue from '../../../providers/upgrade/memoryQueueConfig.ts'
import DefinitionQueueUpgrader from '../../../providers/recompute/defUpgradeQueue.ts'
import MemoryQueue from '../../../providers/recompute/memoryQueueConfig.ts'

describe('DefinitionQueueUpgrader', () => {
let logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ chai.use(chaiAsPromised)

import { expect } from 'chai'
import sinon from 'sinon'
import { DefinitionVersionChecker, factory } from '../../../providers/upgrade/defVersionCheck.ts'
import { DefinitionVersionChecker, factory } from '../../../providers/recompute/defVersionCheck.ts'

describe('DefinitionVersionChecker', () => {
let logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { Definition, DefinitionService, RecomputeContext } from '../../../b
import EntityCoordinates from '../../../lib/entityCoordinates.ts'
import type { ICache } from '../../../providers/caching/index.js'
import type { IQueue } from '../../../providers/queueing/index.js'
import { DelayedComputePolicy } from '../../../providers/upgrade/delayedComputePolicy.ts'
import { DelayedComputePolicy } from '../../../providers/recompute/delayedComputePolicy.ts'
import { createMockLogger } from '../../helpers/mockLogger.ts'

const TEST_COORDINATES = 'npm/npmjs/-/leftpad/1.0.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { expect } from 'chai'
import sinon from 'sinon'
import type { RecomputeContext } from '../../../business/definitionService.ts'
import EntityCoordinates from '../../../lib/entityCoordinates.ts'
import { OnDemandComputePolicy } from '../../../providers/upgrade/onDemandComputePolicy.ts'
import { OnDemandComputePolicy } from '../../../providers/recompute/onDemandComputePolicy.ts'

describe('OnDemandComputePolicy', () => {
let policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ chai.use(chaiAsPromised)
import { expect } from 'chai'
import sinon from 'sinon'
import EntityCoordinates from '../../../lib/entityCoordinates.ts'
import { DefinitionUpgrader, QueueHandler } from '../../../providers/upgrade/process.ts'
import { DefinitionRecomputer, QueueHandler } from '../../../providers/recompute/process.ts'

describe('Definition Upgrade Queue Processing', () => {
describe('Definition Recompute Queue Processing', () => {
let logger

beforeEach(() => {
Expand Down Expand Up @@ -87,13 +87,13 @@ describe('Definition Upgrade Queue Processing', () => {
})
})

describe('DefinitionUpgrader', () => {
describe('DefinitionRecomputer', () => {
const coordinates = 'pypi/pypi/-/test/revision'
const definition = Object.freeze({ coordinates: EntityCoordinates.fromString(coordinates) })
const message = Object.freeze({ data: { coordinates: definition.coordinates } })
let definitionService
let upgradePolicy
let upgrader
let recomputer

beforeEach(() => {
definitionService = {
Expand All @@ -103,13 +103,13 @@ describe('Definition Upgrade Queue Processing', () => {
upgradePolicy = {
validate: sinon.stub()
}
upgrader = new DefinitionUpgrader(definitionService, logger, upgradePolicy)
recomputer = new DefinitionRecomputer(definitionService, logger, upgradePolicy)
})

it('delegates to computeStoreAndCurateIf and logs when definition was recomputed', async () => {
definitionService.computeStoreAndCurateIf.resolves(definition)

await upgrader.processMessage(message)
await recomputer.processMessage(message)
expect(definitionService.computeStoreAndCurateIf.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurateIf.getCall(0).args[0]).to.deep.eq(
EntityCoordinates.fromObject(definition.coordinates)
Expand All @@ -120,23 +120,23 @@ describe('Definition Upgrade Queue Processing', () => {
it('delegates to computeStoreAndCurateIf and logs debug when compute was skipped', async () => {
definitionService.computeStoreAndCurateIf.resolves(undefined)

await upgrader.processMessage(message)
await recomputer.processMessage(message)
expect(definitionService.computeStoreAndCurateIf.calledOnce).to.be.true
expect(logger.debug.calledOnce).to.be.true
expect(logger.info.notCalled).to.be.true
})

it('skips if there is no coordinates', async () => {
await upgrader.processMessage({ data: {} })
await recomputer.processMessage({ data: {} })
expect(definitionService.computeStoreAndCurateIf.notCalled).to.be.true
})

it('handles exception by rethrowing with coordinates and the original error message', async () => {
definitionService.computeStoreAndCurateIf.rejects(new Error('test'))

await expect(upgrader.processMessage(message)).to.be.rejectedWith(
await expect(recomputer.processMessage(message)).to.be.rejectedWith(
Error,
/Error handling definition upgrade for pypi\/pypi\/-\/test\/revision/
/Error handling definition recompute for pypi\/pypi\/-\/test\/revision/
)
})

Expand All @@ -145,7 +145,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.computeStoreAndCurateIf.rejects(originalError)

try {
await upgrader.processMessage(message)
await recomputer.processMessage(message)
expect.fail('should have thrown')
} catch (error: any) {
expect(error.cause).to.equal(originalError)
Expand All @@ -167,7 +167,7 @@ describe('Definition Upgrade Queue Processing', () => {
it('skips compute when policy validates the definition', async () => {
upgradePolicy.validate.resolves(definition) // truthy = valid → predicate returns false → skip

await upgrader.processMessage(message)
await recomputer.processMessage(message)
expect(predicateResult).to.be.false
expect(definitionService.getStored.calledOnceWith(EntityCoordinates.fromObject(definition.coordinates))).to.be
.true
Expand All @@ -178,7 +178,7 @@ describe('Definition Upgrade Queue Processing', () => {
it('proceeds with compute when policy returns falsy', async () => {
upgradePolicy.validate.resolves(undefined) // falsy = stale → predicate returns true → compute

await upgrader.processMessage(message)
await recomputer.processMessage(message)
expect(predicateResult).to.be.true
expect(definitionService.getStored.calledOnceWith(EntityCoordinates.fromObject(definition.coordinates))).to.be
.true
Expand All @@ -199,13 +199,13 @@ describe('Definition Upgrade Queue Processing', () => {
let handler
let definitionService
beforeEach(() => {
let definitionUpgrader
;({ definitionService, definitionUpgrader } = setupDefinitionUpgrader(logger))
let definitionRecomputer
;({ definitionService, definitionRecomputer } = setupDefinitionRecomputer(logger))
queue = {
dequeueMultiple: sinon.stub().resolves([message]),
delete: sinon.stub().resolves()
}
handler = new QueueHandler(queue, logger, definitionUpgrader)
handler = new QueueHandler(queue, logger, definitionRecomputer)
})

it('handles exception and logs the error', async () => {
Expand All @@ -216,7 +216,7 @@ describe('Definition Upgrade Queue Processing', () => {
expect(queue.delete.calledOnce).to.be.true
expect(logger.error.calledOnce).to.be.true
expect(logger.error.args[0][0].message).to.match(
/Error handling definition upgrade for pypi\/pypi\/-\/test\/revision/
/Error handling definition recompute for pypi\/pypi\/-\/test\/revision/
)
})

Expand Down Expand Up @@ -285,14 +285,14 @@ describe('Definition Upgrade Queue Processing', () => {
})
})

function setupDefinitionUpgrader(logger) {
function setupDefinitionRecomputer(logger) {
const definitionService = {
getStored: sinon.stub(),
computeStoreAndCurateIf: sinon.stub()
}
const upgradePolicy = {
validate: sinon.stub()
}
const definitionUpgrader = new DefinitionUpgrader(definitionService as any, logger, upgradePolicy)
return { definitionService, upgradePolicy, definitionUpgrader }
const definitionRecomputer = new DefinitionRecomputer(definitionService as any, logger, upgradePolicy)
return { definitionService, upgradePolicy, definitionRecomputer }
}
Loading
Loading