Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import FolderIcon from '/icons/solar/Folder.svg?react'
import { filesystemService } from '~/services/filesystem-service'
import type { FilesystemEntry } from '~/types/filesystem.types'
import { ApiError } from '~/utils/api'
import { useDirectoryWatcher } from '~/hooks/use-file-watcher'
import Button from '../inputs/button'

interface DirectoryPickerProperties {
Expand Down Expand Up @@ -47,6 +48,8 @@ export default function DirectoryPicker({
}
}, [])

useDirectoryWatcher(isOpen ? currentPath : undefined, () => void loadEntries(currentPath))

const handleNavigateUp = () => {
loadEntries(parentPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import CodeFileIcon from '../../../icons/solar/Code File.svg?react'
import TrashBinIcon from '../../../icons/solar/Trash Bin.svg?react'
import Pen from '../../../icons/solar/Pen.svg?react'
import { useShortcut } from '~/hooks/use-shortcut'
import { useFileWatcher } from '~/hooks/use-file-watcher'
import type { ContextMenuState } from './use-file-tree-context-menu'

import {
Expand Down Expand Up @@ -61,6 +62,10 @@ export default function EditorFileStructure() {
expandedItemsRef.current = editorExpandedItems
}, [editorExpandedItems])

useFileWatcher(project?.name, () => {
if (dataProvider) void dataProvider.reloadDirectory('root')
})

const onAfterRename = useCallback(
(oldPath: string, newName: string) => {
const tab = getTab(oldPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import '/styles/editor-files.css'
import AltArrowRightIcon from '../../../icons/solar/Alt Arrow Right.svg?react'
import AltArrowDownIcon from '../../../icons/solar/Alt Arrow Down.svg?react'
import { useShortcut } from '~/hooks/use-shortcut'
import { useFileWatcher } from '~/hooks/use-file-watcher'
import type { StudioContextMenuState } from './use-studio-context-menu'

import {
Expand Down Expand Up @@ -68,6 +69,10 @@ export default function StudioFileStructure() {
expandedItemsRef.current = studioExpandedItems
}, [studioExpandedItems])

useFileWatcher(project?.name, () => {
if (dataProvider) void dataProvider.reloadDirectory('root')
})

const studioContextMenu = useStudioContextMenu({
projectName: project?.name,
dataProvider,
Expand Down
31 changes: 31 additions & 0 deletions src/main/frontend/app/hooks/use-file-watcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { useEffect, useRef } from 'react'
import { apiUrl } from '~/utils/api'

function useSseWatcher(url: string | undefined, onFileChange: () => void) {
const callbackRef = useRef(onFileChange)
callbackRef.current = onFileChange

useEffect(() => {
if (!url) return

const eventSource = new EventSource(url)

eventSource.addEventListener('file-change', () => {
callbackRef.current()
})

return () => {
eventSource.close()
}
}, [url])
}

export function useFileWatcher(projectName: string | undefined, onFileChange: () => void) {
const url = projectName ? apiUrl(`/projects/${projectName}/watch`) : undefined
useSseWatcher(url, onFileChange)
}

export function useDirectoryWatcher(path: string | undefined, onFileChange: () => void) {
const url = path ? apiUrl(`/filesystem/watch?path=${encodeURIComponent(path)}`) : undefined
useSseWatcher(url, onFileChange)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequestMapping("/projects/{projectName}")
public class FileTreeController {

private final FileTreeService fileTreeService;
private final FileWatcherService fileWatcherService;

public FileTreeController(FileTreeService fileTreeService) {
public FileTreeController(FileTreeService fileTreeService, FileWatcherService fileWatcherService) {
this.fileTreeService = fileTreeService;
this.fileWatcherService = fileWatcherService;
}

@GetMapping("/tree")
Expand Down Expand Up @@ -52,4 +55,9 @@ public ResponseEntity<FileTreeNode> createFolder(@PathVariable String projectNam
FileTreeNode node = fileTreeService.createFolder(projectName, dto.path());
return ResponseEntity.status(HttpStatus.CREATED.value()).body(node);
}

@GetMapping("/watch")
public SseEmitter watchProject(@PathVariable String projectName) {
return fileWatcherService.subscribeToProject(projectName);
}
}
239 changes: 239 additions & 0 deletions src/main/java/org/frankframework/flow/file/FileWatcherService.java
Comment thread
stijnpotters1 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package org.frankframework.flow.file;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.log4j.Log4j2;
import org.frankframework.flow.filesystem.FileSystemStorage;
import org.frankframework.flow.project.ConfigurationProject;
import org.frankframework.flow.project.ConfigurationProjectService;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Log4j2
@Service
public class FileWatcherService {

private static final long DEBOUNCE_DELAY_MS = 150;
private static final Set<String> IGNORED_DIRECTORIES = Set.of(".git", "target", "node_modules");

private final FileSystemStorage fileSystemStorage;
private final FileTreeService fileTreeService;
private final ConfigurationProjectService configurationProjectService;

private WatchService watchService;

private final Map<WatchKey, String> watchKeyChannels = new ConcurrentHashMap<>();
private final Map<String, List<SseEmitter>> channelEmitters = new ConcurrentHashMap<>();
private final Map<String, Runnable> channelCallbacks = new ConcurrentHashMap<>();
private final Map<String, ScheduledFuture<?>> pendingBroadcasts = new ConcurrentHashMap<>();

private final ScheduledExecutorService debounceExecutor = Executors.newSingleThreadScheduledExecutor(
Thread.ofVirtual().name("file-watcher-debounce", 0).factory()
);

public FileWatcherService(
FileSystemStorage fileSystemStorage,
FileTreeService fileTreeService,
ConfigurationProjectService configurationProjectService
) {
this.fileSystemStorage = fileSystemStorage;
this.fileTreeService = fileTreeService;
this.configurationProjectService = configurationProjectService;
}

@PostConstruct
public void start() {
if (!fileSystemStorage.isLocalEnvironment()) {
return;
}

try {
watchService = FileSystems.getDefault().newWatchService();
Thread.ofVirtual().name("file-watcher").start(this::watchLoop);
log.info("File watcher service started");
} catch (IOException exception) {
log.error("Failed to start file watcher", exception);
}
}

@PreDestroy
public void stop() {
debounceExecutor.shutdownNow();
if (watchService != null) {
try {
watchService.close();
} catch (IOException exception) {
log.warn("Failed to close watch service", exception);
}
}
}

public SseEmitter subscribeToProject(String projectName) {
if (watchService == null) {
return createEmitter(projectName);
}
try {
ConfigurationProject project = configurationProjectService.getProject(projectName);
Path projectPath = fileSystemStorage.toAbsolutePath(project.getRootPath());
String channelId = projectPath.toString();
channelCallbacks.put(channelId, () -> fileTreeService.invalidateTreeCache(projectName));
registerRecursively(projectPath, channelId);

return createEmitter(channelId);
} catch (Exception exception) {
log.warn("Failed to register project for watching: {}", projectName, exception);
return createEmitter(projectName);
}
}

public SseEmitter subscribeToPath(Path absolutePath) throws IOException {
String channelId = absolutePath.toString();
if (watchService != null && Files.isDirectory(absolutePath)) {
WatchKey key = absolutePath.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
watchKeyChannels.put(key, channelId);
}
return createEmitter(channelId);
}

private SseEmitter createEmitter(String channelId) {
SseEmitter emitter = new SseEmitter(0L);
channelEmitters.computeIfAbsent(channelId, _ -> new CopyOnWriteArrayList<>()).add(emitter);
Runnable cleanup = () -> removeFromChannel(channelId, emitter);
emitter.onCompletion(cleanup);
emitter.onTimeout(cleanup);
emitter.onError(_ -> cleanup.run());

return emitter;
}

private void removeFromChannel(String channelId, SseEmitter emitter) {
List<SseEmitter> list = channelEmitters.get(channelId);
if (list != null) {
list.remove(emitter);
}
}

private void registerRecursively(Path dir, String channelId) throws IOException {
Files.walkFileTree(dir, new SimpleFileVisitor<>() {

@Override
public FileVisitResult preVisitDirectory(Path directory, BasicFileAttributes attrs) throws IOException {
String name = directory.getFileName() != null ? directory.getFileName().toString() : "";
if (IGNORED_DIRECTORIES.contains(name)) {
return FileVisitResult.SKIP_SUBTREE;
}
WatchKey key = directory.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
watchKeyChannels.put(key, channelId);
return FileVisitResult.CONTINUE;
}
});
}

private void watchLoop() {
while (!Thread.currentThread().isInterrupted()) {
WatchKey key;
try {
key = watchService.take();
} catch (InterruptedException | ClosedWatchServiceException exception) {
break;
}

String channelId = watchKeyChannels.get(key);
if (channelId == null) {
key.reset();
continue;
}

registerNewSubdirectories(key, channelId);
scheduleBroadcast(channelId);

if (!key.reset()) {
watchKeyChannels.remove(key);
}
}
}

private void registerNewSubdirectories(WatchKey key, String channelId) {
Path watchedDir = (Path) key.watchable();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() != StandardWatchEventKinds.ENTRY_CREATE) {
continue;
}

Path created = watchedDir.resolve(((WatchEvent<Path>) event).context());
if (Files.isDirectory(created)) {
try {
registerRecursively(created, channelId);
} catch (IOException exception) {
log.warn("Failed to register new directory: {}", created);
}
}
}
}

private void scheduleBroadcast(String channelId) {
ScheduledFuture<?> existing = pendingBroadcasts.remove(channelId);
if (existing != null) {
existing.cancel(false);
}
pendingBroadcasts.put(channelId, debounceExecutor.schedule(() -> {
Runnable callback = channelCallbacks.get(channelId);
if (callback != null) {
callback.run();
}

broadcast(channelId);
pendingBroadcasts.remove(channelId);
}, DEBOUNCE_DELAY_MS, TimeUnit.MILLISECONDS));
}

private void broadcast(String channelId) {
List<SseEmitter> emitters = channelEmitters.get(channelId);
if (emitters == null || emitters.isEmpty()) {
return;
}

List<SseEmitter> dead = new ArrayList<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name("file-change").data("changed"));
} catch (Exception exception) {
dead.add(emitter);
}
}

emitters.removeAll(dead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

import java.io.IOException;
import java.nio.file.AccessDeniedException;
import org.frankframework.flow.file.FileWatcherService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequestMapping("/filesystem")
public class FilesystemController {

private final FileSystemStorage fileSystemStorage;
private final FileWatcherService fileWatcherService;

public FilesystemController(FileSystemStorage fileSystemStorage) {
public FilesystemController(FileSystemStorage fileSystemStorage, FileWatcherService fileWatcherService) {
this.fileSystemStorage = fileSystemStorage;
this.fileWatcherService = fileWatcherService;
}

@GetMapping("/browse")
Expand All @@ -28,4 +32,9 @@ public ResponseEntity<BrowseResult> browse(@RequestParam(required = false, defau
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
}

@GetMapping("/watch")
public SseEmitter watch(@RequestParam String path) throws IOException {
return fileWatcherService.subscribeToPath(fileSystemStorage.toAbsolutePath(path));
}
}
Loading