Skip to content

Add DirectoryListener#764

Open
dr0i wants to merge 1 commit into
masterfrom
702-addDirectoryListener
Open

Add DirectoryListener#764
dr0i wants to merge 1 commit into
masterfrom
702-addDirectoryListener

Conversation

@dr0i
Copy link
Copy Markdown
Member

@dr0i dr0i commented Apr 14, 2026

See #702.

This PR works in principle.
Be aware that there is a bug regarding the WatchService that can result in a loss of awareness of created files, e.g. if these files are created too fast in a row, or e.g. by moving a whole directory with files in it to the directory which is listened to: that directory would be watched, but the files in there won't be recognized. So it's not as inotify in unix contexts - it only comes close to it.

You may want to test it like this:

  1. ./gradlew assembleDist (in the root of this branch to build the runner)
  2. cd ./metafacture-runner/build/distributions/
  3. tar xfz metafacture-core-702-addDirectoryListener-SNAPSHOT-dist.tar.gz
  4. create a FLUX:
echo "default infile = FLUX_DIR + "tmp";
infile|
listen-directory|
print
;"  > directoryListener.flux
  1. mkdir tmp (creates the directory to listen to)
  2. metafacture-core-702-addDirectoryListener-SNAPSHOT-dist/flux.sh directoryListener.flux to execute the FLUX
  3. touch 1 2 3 4 5 6 7 tmp/ to create some files

You should see as output the names of the files with absolute path (which could be given in the FLUX to open-file) and some logs (which are not going to the piped flux-command (as e.g. open-file)) are printed to stdout.
(Interestingly , you see that even "touch"ing 7 files consecutively the WatchService is fast enough to observe the creation of these files.)

If you want the listener to go down: trigger it with the specially named file:
touch shutdownEtlNow

We may want to discuss if it's necessary to improve the behaviour by building some workarounds. One idea would be to just traverse the given directory every n-th second and notate the filenames, if any new appear, to a Map and push these down the pipe. This would guarantee to not miss one file (at the cost of not instantly getting the filename if one was created.)

@dr0i
Copy link
Copy Markdown
Member Author

dr0i commented Apr 14, 2026

functional review: @TobiasNx (and maybe @fsteeg as this PR is supposed to be part of a workflow in the RPB context).
discussion (re bug and workaround): also @blackwinter
code review: @blackwinter (or @fsteeg )

@dr0i dr0i requested review from TobiasNx and fsteeg April 14, 2026 14:41
@dr0i dr0i linked an issue Apr 14, 2026 that may be closed by this pull request
@TobiasNx
Copy link
Copy Markdown
Contributor

TobiasNx commented Apr 29, 2026

@fsteeg and I tested it. I followed your example. First

4. create a FLUX:

echo "default infile = FLUX_DIR + "tmp";
infile|
listen-directory|
print
;"  > directoryListener.flux

This creates a broken flux since the echo loses the quotes around tmp.

tobias@hbz-hp:~/temp$ '/home/tobias/git/metafacture-core/metafacture-runner/build/install/metafacture-core/flux.sh' directoryListener.flux
Exception in thread "main" org.metafacture.flux.FluxParseException: Variable tmp not assigned.
	at org.metafacture.flux.parser.FlowBuilder.exp(FlowBuilder.java:604)
	at org.metafacture.flux.parser.FlowBuilder.exp(FlowBuilder.java:619)
	at org.metafacture.flux.parser.FlowBuilder.varDef(FlowBuilder.java:386)
	at org.metafacture.flux.parser.FlowBuilder.varDefs(FlowBuilder.java:287)
	at org.metafacture.flux.parser.FlowBuilder.flux(FlowBuilder.java:105)
	at org.metafacture.flux.FluxCompiler.compileFlow(FluxCompiler.java:66)
	at org.metafacture.flux.FluxCompiler.compile(FluxCompiler.java:54)
	at org.metafacture.runner.Flux.main(Flux.java:87)

After fixing this.
Your example flux with printing seems to work fine when creating or modifing files. The logging is a little too much.

tobias@hbz-hp:~/temp$ '/home/tobias/git/metafacture-core/metafacture-runner/build/install/metafacture-core/flux.sh' directoryListener.flux
Add directory to watch: /home/tobias/temp/tmp
Event kind:ENTRY_CREATE. File affected: 1.
/home/tobias/temp/tmp/1Event kind:ENTRY_MODIFY. File affected: 1.

/home/tobias/temp/tmp/1Event kind:ENTRY_CREATE. File affected: 2.

/home/tobias/temp/tmp/2Event kind:ENTRY_MODIFY. File affected: 2.

/home/tobias/temp/tmp/2Event kind:ENTRY_CREATE. File affected: 3.

/home/tobias/temp/tmp/3Event kind:ENTRY_MODIFY. File affected: 3.

/home/tobias/temp/tmp/3Event kind:ENTRY_CREATE. File affected: 4.

/home/tobias/temp/tmp/4Event kind:ENTRY_MODIFY. File affected: 4.

/home/tobias/temp/tmp/4Event kind:ENTRY_CREATE. File affected: 5.

/home/tobias/temp/tmp/5Event kind:ENTRY_MODIFY. File affected: 5.

/home/tobias/temp/tmp/5Event kind:ENTRY_CREATE. File affected: 6.

/home/tobias/temp/tmp/6Event kind:ENTRY_MODIFY. File affected: 6.

/home/tobias/temp/tmp/6Event kind:ENTRY_CREATE. File affected: 7.

/home/tobias/temp/tmp/7Event kind:ENTRY_MODIFY. File affected: 7.

/home/tobias/temp/tmp/7Event kind:ENTRY_CREATE. File affected: .1.swp.

/home/tobias/temp/tmp/.1.swpEvent kind:ENTRY_MODIFY. File affected: .1.swp.

/home/tobias/temp/tmp/.1.swpEvent kind:ENTRY_CREATE. File affected: .1.swp.

/home/tobias/temp/tmp/.1.swpEvent kind:ENTRY_MODIFY. File affected: .1.swp.

/home/tobias/temp/tmp/.1.swpEvent kind:ENTRY_MODIFY. File affected: 1.

/home/tobias/temp/tmp/1Event kind:ENTRY_CREATE. File affected: shutdownEtlNow.
Event kind:ENTRY_MODIFY. File affected: shutdownEtlNow.

When changing print to write the process closes and the flux breaks when a file is modified or created.

tobias@hbz-hp:~/temp$ '/home/tobias/git/metafacture-core/metafacture-runner/build/install/metafacture-core/flux.sh' directoryListener.flux
Add directory to watch: /home/tobias/temp/tmp
Event kind:ENTRY_CREATE. File affected: .1.swp.
Exception in thread "Thread-0" org.metafacture.framework.MetafactureException: java.io.IOException: Stream closed
	at org.metafacture.io.ObjectFileWriter.process(ObjectFileWriter.java:110)
	at org.metafacture.io.ObjectWriter.process(ObjectWriter.java:147)
	at org.metafacture.io.DirectoryListener$DirectoryWatcher.processFile(DirectoryListener.java:195)
	at org.metafacture.io.DirectoryListener$DirectoryWatcher.run(DirectoryListener.java:166)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Stream closed
	at java.base/sun.nio.cs.StreamEncoder.ensureOpen(StreamEncoder.java:51)
	at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
	at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:142)
	at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:223)
	at java.base/java.io.Writer.write(Writer.java:249)
	at org.metafacture.io.ObjectFileWriter.process(ObjectFileWriter.java:101)
	... 4 more

What I currently also do not understand is how I am able to recognize the filenames to further process the changed or created files. What is the output of listen-directory?

@fsteeg fsteeg removed request for TobiasNx and fsteeg April 29, 2026 13:32
@fsteeg fsteeg assigned dr0i and unassigned fsteeg and TobiasNx Apr 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add possibility to listen on a directory

3 participants