-
-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathFileConnector.java
More file actions
148 lines (131 loc) · 4.96 KB
/
Copy pathFileConnector.java
File metadata and controls
148 lines (131 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package org.myrobotlab.service;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Date;
import org.myrobotlab.document.Document;
import org.myrobotlab.document.connector.AbstractConnector;
import org.myrobotlab.document.connector.ConnectorState;
import org.myrobotlab.document.transformer.ConnectorConfig;
import org.myrobotlab.logging.LoggerFactory;
import org.myrobotlab.service.config.FileConnectorConfig;
import org.myrobotlab.service.config.ServiceConfig;
import org.myrobotlab.service.interfaces.DocumentPublisher;
import org.slf4j.Logger;
public class FileConnector extends AbstractConnector implements DocumentPublisher, FileVisitor<Path> {
public final static Logger log = LoggerFactory.getLogger(FileConnector.class.getCanonicalName());
private static final long serialVersionUID = 1L;
// private String directory;
private FileConnectorConfig config = new FileConnectorConfig();
// TODO: add wildcard includes/excludes
// TODO: add file path includes/excludes
private volatile boolean interrupted = false;
public FileConnector(String name, String id) {
super(name, id);
}
@Override
public void setConfig(ConnectorConfig config) {
// TODO Auto-generated method stub
log.info("Set Config not yet implemented");
}
@Override
public void startCrawling() {
state = ConnectorState.RUNNING;
setStart(System.currentTimeMillis());
Path startPath = Paths.get(((FileConnectorConfig)config).directory);
log.info("Started Crawling {}", startPath);
try {
Files.walkFileTree(startPath, this);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// we're done.. publish a flush so other down stream components know to flush any partial batches they might have.
invoke("publishFlush");
log.info("File Connector finished walking the tree.");
// TODO: should we flush here immediately?
state = ConnectorState.STOPPED;
}
@Override
public void stopCrawling() {
log.info("Stop crawling requested...");
interrupted = true;
state = ConnectorState.INTERRUPTED;
// notify();
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (interrupted) {
log.info("Interrupted, terminating crawl.");
state = ConnectorState.INTERRUPTED;
return FileVisitResult.TERMINATE;
}
log.info("Crawling {} Feed Count {}" , file.toFile().getAbsolutePath(), getFeedCount());;
String docId = getDocIdPrefix() + file.toFile().getAbsolutePath();
Document doc = new Document(docId);
doc.setField("last_modified", new Date(attrs.lastModifiedTime().toMillis()));
doc.setField("created_date", new Date(attrs.creationTime().toMillis()));
doc.setField("filepath", file.toFile().getAbsolutePath());
doc.setField("size", attrs.size());
doc.setField("type", "file");
// TODO: potentially add a byte array of the file
// or maybe an input stream or other handle to the file.
feed(doc);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
if (interrupted) {
log.info("Interrupted, terminating crawl.");
state = ConnectorState.INTERRUPTED;
return FileVisitResult.TERMINATE;
}
log.warn("Failed Crawling {} Feed Count {}" , file.toFile().getAbsolutePath(), getFeedCount());
String docId = getDocIdPrefix() + file.toFile().getAbsolutePath();
Document doc = new Document(docId);
doc.setField("type", "file");
// TODO: how does this serialize?
doc.setField("error", exc);
// doc.setField("timestamp", new Date());
feed(doc);
log.warn("Exception processing {}", file, exc);
// Keep going!!!
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
if (exc != null) {
throw exc;
}
return FileVisitResult.CONTINUE;
}
public String getDirectory() {
return config.directory;
}
public void setDirectory(String directory) {
config.directory = directory;
}
@Override
public ServiceConfig apply(ServiceConfig c) {
super.apply(c);
// anything else?
return c;
}
@Override
public ServiceConfig getConfig() {
// return the config
// we need the super stuff here.
FileConnectorConfig config = (FileConnectorConfig)super.getConfig();
// this is goofy..
config.directory = this.config.directory;
return config;
}
}