Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 275 additions & 6 deletions plugins/sftp/service/src/controller/controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -751,12 +751,281 @@ describe('automated processing', () => {
expect(archiverSpy.createArchive).toHaveBeenCalledTimes(0)
})

// it('skips processed observations w/ create trigger', async () => {
// })
describe('reconnection logic', () => {
let connectSpy: jasmine.Spy

// it('waits for observations to contain all attachments', async () => {
// })
beforeEach(() => {
connectSpy = SFTPClient.prototype.connect as jasmine.Spy
})

async function flushMicrotasks(n = 20) {
for (let i = 0; i < n; i++) {
await Promise.resolve()
}
}

function createController(
eventRepository: jasmine.SpyObj<MageEventRepository>,
userRepository: jasmine.SpyObj<UserRepository>
) {
return new SftpController(
console,
{ stateRepository, eventRepository, observationRepository, userRepository, attachmentStore },
dbConnection
)
}

it('sets status to disconnected on initial connection failure', async () => {
connectSpy.and.rejectWith(new Error('ECONNREFUSED'))
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = createController(eventRepository, userRepository)
await controller.start()

const status = controller.getStatus()
expect(status.connected).toBe(false)
expect(status.lastError).toContain('Connection failed')
expect(status.lastConnectionAttempt).toBeDefined()

await controller.stop()
})

it('retries connection on the next sync interval', async () => {
connectSpy.and.rejectWith(new Error('ECONNREFUSED'))
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = createController(eventRepository, userRepository)
await controller.start()
expect(connectSpy).toHaveBeenCalledTimes(1)
expect(controller.getStatus().connected).toBe(false)

// Make connect succeed for the next sync interval
connectSpy.and.resolveTo()

// Advance clock past sync interval
clock.tick(clockTickMillis)
await flushMicrotasks()

expect(connectSpy).toHaveBeenCalledTimes(2)
expect(controller.getStatus().connected).toBe(true)
expect(controller.getStatus().lastError).toBeUndefined()

await controller.stop()
})

it('keeps retrying on each sync interval until connected', async () => {
let callCount = 0
connectSpy.and.callFake(() => {
callCount++
if (callCount <= 3) {
return Promise.reject(new Error('ECONNREFUSED'))
}
return Promise.resolve()
})
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = createController(eventRepository, userRepository)

// Initial attempt fails (callCount = 1)
await controller.start()
expect(controller.getStatus().connected).toBe(false)

// Second attempt fails (callCount = 2)
clock.tick(clockTickMillis)
await flushMicrotasks()
expect(controller.getStatus().connected).toBe(false)

// Third attempt fails (callCount = 3)
clock.tick(clockTickMillis)
await flushMicrotasks()
expect(controller.getStatus().connected).toBe(false)

// Fourth attempt succeeds (callCount = 4)
clock.tick(clockTickMillis)
await flushMicrotasks()
expect(controller.getStatus().connected).toBe(true)
expect(controller.getStatus().lastError).toBeUndefined()

await controller.stop()
})

it('skips event processing while disconnected', async () => {
connectSpy.and.rejectWith(new Error('ECONNREFUSED'))
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = createController(eventRepository, userRepository)
await controller.start()

// it('processes incomplete observation after timeout', async () => {
// })
// Tick through a sync interval while still disconnected
clock.tick(clockTickMillis)
await flushMicrotasks()

// Events should not have been fetched since we're disconnected
expect(eventRepository.findActiveEvents).not.toHaveBeenCalled()

await controller.stop()
})

it('marks disconnected when close event fires', async () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const eventHandlers: Record<string, Function[]> = {}
spyOn(SFTPClient.prototype, 'on').and.callFake(function (this: any, event: string, handler: Function) {
if (!eventHandlers[event]) eventHandlers[event] = []
eventHandlers[event].push(handler)
return this
})

const controller = createController(eventRepository, userRepository)
await controller.start()
expect(controller.getStatus().connected).toBe(true)

// Trigger the captured close handler
eventHandlers['close']?.forEach(h => h())

const status = controller.getStatus()
expect(status.connected).toBe(false)
expect(status.lastError).toContain('Connection closed')

await controller.stop()
})

it('marks disconnected when error event fires', async () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const eventHandlers: Record<string, Function[]> = {}
spyOn(SFTPClient.prototype, 'on').and.callFake(function (this: any, event: string, handler: Function) {
if (!eventHandlers[event]) eventHandlers[event] = []
eventHandlers[event].push(handler)
return this
})

const controller = createController(eventRepository, userRepository)
await controller.start()
expect(controller.getStatus().connected).toBe(true)

// Trigger the captured error handler
eventHandlers['error']?.forEach(h => h(new Error('Network unreachable')))

const status = controller.getStatus()
expect(status.connected).toBe(false)
expect(status.lastError).toContain('Network unreachable')

await controller.stop()
})

it('does not attempt reconnect after stop', async () => {
connectSpy.and.rejectWith(new Error('ECONNREFUSED'))
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }
const clockTickMillis = stateRepository.state.interval * 1000 + 1

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const controller = createController(eventRepository, userRepository)
await controller.start()
expect(connectSpy).toHaveBeenCalledTimes(1)

// Stop before the next interval fires
await controller.stop()

// Advance clock past multiple sync intervals
clock.tick(clockTickMillis * 3)
await flushMicrotasks()

// No additional connect attempts after stop
expect(connectSpy).toHaveBeenCalledTimes(1)
})

it('ignores connection loss events after stop', async () => {
stateRepository.state = { ...defaultSFTPPluginConfig, interval: 10, enabled: true }

const eventRepository = jasmine.createSpyObj<MageEventRepository>('eventRepository', ['findActiveEvents'])
eventRepository.findActiveEvents.and.resolveTo([])
const userRepository = jasmine.createSpyObj<UserRepository>('userRepo', ['findById'])

spyOn(MongooseSftpObservationRepository.prototype, 'findAllByStatus').and.resolveTo([])
spyOn(MongooseSftpObservationRepository.prototype, 'findLatestSyncedObservationTime').and.resolveTo(null)
spyOn(ArchiverFactory.prototype, 'createArchiver').and.returnValue(newArchiver(ArchiveStatus.Complete))

const eventHandlers: Record<string, Function[]> = {}
spyOn(SFTPClient.prototype, 'on').and.callFake(function (this: any, event: string, handler: Function) {
if (!eventHandlers[event]) eventHandlers[event] = []
eventHandlers[event].push(handler)
return this
})

const controller = createController(eventRepository, userRepository)
await controller.start()
expect(controller.getStatus().connected).toBe(true)

// Stop the controller first
await controller.stop()
expect(connectSpy).toHaveBeenCalledTimes(1)

// Trigger close handler - should be ignored since controller is stopped
eventHandlers['close']?.forEach(h => h())

// No additional reconnect attempts
clock.tick(60000)
await flushMicrotasks()
expect(connectSpy).toHaveBeenCalledTimes(1)
})
})
})
Loading
Loading