Skip to content

Commit 57b2da9

Browse files
cgivrembeckerle
andauthored
DRILL-8474: Add Daffodil Format Plugin to Drill (#2989)
Co-authored-by: Michael Beckerle <mbeckerle@apache.org>
1 parent 4cdd9b6 commit 57b2da9

64 files changed

Lines changed: 4800 additions & 78 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

contrib/format-daffodil/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Daffodil Format Reader
2+
This plugin enables Drill to read DFDL-described data from files by way of the Apache Daffodil DFDL implementation.
3+
4+
## Configuration:
5+
To use Daffodil schemata, simply add the following to the `formats` section of a file-based storage plugin:
6+
7+
```json
8+
"daffodil": {
9+
"type": "daffodil",
10+
"extensions": [
11+
"dat"
12+
]
13+
}
14+
```
15+
There are four other optional parameters which you can specify:
16+
* `schemaURI`: Pre-compiled dfdl schema (.bin extension) or DFDL schema source (.xsd extension)
17+
* `validationMode`: Use `true` to request Daffodil built-in limited validation. Use `false` for no validation.
18+
* `rootName`: Local name of root element of the message. Can be null to use the first element declaration of the primary schema file. Ignored if reloading a pre-compiled schema.
19+
* `rootNameSpace`: Namespace URI as a string. Can be `null` to use the target namespace of the primary schema file or if it is unambiguous what element is the rootName. Ignored if reloading a pre-compiled schema.
20+
21+
## Usage:
22+
23+
24+
25+
## Limitations:
26+
At the moment, the DFDL schema is found on the local file system, which won't support Drill's distributed architecture.
27+
28+
There are restrictions on the DFDL schemas that this can handle. In particular, all element children must have distinct element names, including across choice branches. Unfortunately, this rules out a number of large DFDL schemas.
29+
30+
TBD: Auto renaming as part of the Daffodil-to-Drill metadata mapping?
31+
32+
The data is parsed fully from its native form into a Drill data structure held in memory. No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query.
33+
34+
If the data is not well-formed, an error occurs and the query fails.
35+
36+
If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails.
37+

contrib/format-daffodil/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one
5+
or more contributor license agreements. See the NOTICE file
6+
distributed with this work for additional information
7+
regarding copyright ownership. The ASF licenses this file
8+
to you under the Apache License, Version 2.0 (the
9+
"License"); you may not use this file except in compliance
10+
with the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
20+
-->
21+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<artifactId>drill-contrib-parent</artifactId>
27+
<groupId>org.apache.drill.contrib</groupId>
28+
<version>1.23.0-SNAPSHOT</version>
29+
</parent>
30+
31+
<artifactId>drill-format-daffodil</artifactId>
32+
<name>Drill : Contrib : Format : Daffodil</name>
33+
34+
<dependencies>
35+
<dependency>
36+
<groupId>org.apache.drill.exec</groupId>
37+
<artifactId>drill-java-exec</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.daffodil</groupId>
42+
<artifactId>daffodil-japi_2.13</artifactId>
43+
<version>${daffodil.version}</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.daffodil</groupId>
47+
<artifactId>daffodil-runtime1_2.13</artifactId>
48+
<version>${daffodil.version}</version>
49+
</dependency>
50+
<!-- Test dependencies -->
51+
<dependency>
52+
<groupId>org.apache.drill.exec</groupId>
53+
<artifactId>drill-java-exec</artifactId>
54+
<classifier>tests</classifier>
55+
<version>${project.version}</version>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.drill</groupId>
61+
<artifactId>drill-common</artifactId>
62+
<classifier>tests</classifier>
63+
<version>${project.version}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
</dependencies>
67+
68+
<build>
69+
<plugins>
70+
<plugin>
71+
<artifactId>maven-resources-plugin</artifactId>
72+
<executions>
73+
<execution>
74+
<id>copy-java-sources</id>
75+
<phase>process-sources</phase>
76+
<goals>
77+
<goal>copy-resources</goal>
78+
</goals>
79+
<configuration>
80+
<outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/daffodil
81+
</outputDirectory>
82+
<resources>
83+
<resource>
84+
<directory>src/main/java/org/apache/drill/exec/store/daffodil</directory>
85+
<filtering>true</filtering>
86+
</resource>
87+
</resources>
88+
</configuration>
89+
</execution>
90+
</executions>
91+
</plugin>
92+
</plugins>
93+
</build>
94+
</project>
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.drill.exec.store.daffodil;
20+
21+
import org.apache.daffodil.japi.DataProcessor;
22+
import org.apache.drill.common.AutoCloseables;
23+
import org.apache.drill.common.exceptions.CustomErrorContext;
24+
import org.apache.drill.common.exceptions.UserException;
25+
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
26+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
27+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
28+
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
29+
import org.apache.drill.exec.record.metadata.TupleMetadata;
30+
import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
31+
import org.apache.drill.exec.store.dfs.DrillFileSystem;
32+
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
33+
import org.apache.hadoop.fs.Path;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.io.IOException;
38+
import java.io.InputStream;
39+
import java.net.URI;
40+
import java.net.URISyntaxException;
41+
import java.util.Objects;
42+
43+
import static org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory.CompileFailure;
44+
import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
45+
46+
public class DaffodilBatchReader implements ManagedReader {
47+
48+
private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
49+
private final RowSetLoader rowSetLoader;
50+
private final CustomErrorContext errorContext;
51+
private final DaffodilMessageParser dafParser;
52+
private final InputStream dataInputStream;
53+
54+
public DaffodilBatchReader(DaffodilReaderConfig readerConfig, EasySubScan scan,
55+
FileSchemaNegotiator negotiator) {
56+
57+
errorContext = negotiator.parentErrorContext();
58+
DaffodilFormatConfig dafConfig = readerConfig.plugin.getConfig();
59+
60+
String schemaFile = dafConfig.getSchemaFile();
61+
String schemaURIString = dafConfig.getSchemaURI();
62+
String rootName = dafConfig.getRootName();
63+
String rootNamespace = dafConfig.getRootNamespace();
64+
boolean validationMode = dafConfig.getValidationMode();
65+
66+
// Determine the schema URI:
67+
// - If schemaFile is provided, it takes precedence and is looked up in the registry area
68+
// - Otherwise, use schemaURI (full path)
69+
URI dfdlSchemaURI;
70+
try {
71+
if (schemaFile != null && !schemaFile.isEmpty()) {
72+
// schemaFile takes precedence - construct path from registry area
73+
Path registryArea = readerConfig.plugin.getContext()
74+
.getRemoteDaffodilSchemaRegistry().getRegistryArea();
75+
Path schemaPath = new Path(registryArea, schemaFile);
76+
dfdlSchemaURI = schemaPath.toUri();
77+
} else if (schemaURIString != null && !schemaURIString.isEmpty()) {
78+
// Use the provided schemaURI
79+
dfdlSchemaURI = new URI(schemaURIString);
80+
} else {
81+
// Neither provided - will result in empty URI
82+
dfdlSchemaURI = new URI("");
83+
}
84+
} catch (URISyntaxException e) {
85+
throw UserException.validationError(e).build(logger);
86+
}
87+
88+
FileDescrip file = negotiator.file();
89+
DrillFileSystem fs = file.fileSystem();
90+
URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
91+
92+
DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
93+
DataProcessor dp;
94+
try {
95+
dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
96+
} catch (CompileFailure e) {
97+
throw UserException.dataReadError(e)
98+
.message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
99+
.addContext(errorContext).addContext(e.getMessage()).build(logger);
100+
}
101+
// Create the corresponding Drill schema.
102+
// Note: this could be a very large schema. Think of a large complex RDBMS schema,
103+
// all of it, hundreds of tables, but all part of the same metadata tree.
104+
TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
105+
// Inform Drill about the schema
106+
negotiator.tableSchema(drillSchema, true);
107+
108+
//
109+
// DATA TIME: Next we construct the runtime objects, and open files.
110+
//
111+
// We get the DaffodilMessageParser, which is a stateful driver for daffodil that
112+
// actually does the parsing.
113+
rowSetLoader = negotiator.build().writer();
114+
115+
// We construct the Daffodil InfosetOutputter which the daffodil parser uses to
116+
// convert infoset event calls to fill in a Drill row via a rowSetLoader.
117+
DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
118+
119+
// Now we can set up the dafParser with the outputter it will drive with
120+
// the parser-produced infoset.
121+
dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
122+
dafParser.setInfosetOutputter(outputter);
123+
124+
Path dataPath = file.split().getPath();
125+
// Lastly, we open the data stream
126+
try {
127+
dataInputStream = fs.openPossiblyCompressedStream(dataPath);
128+
} catch (IOException e) {
129+
throw UserException.dataReadError(e)
130+
.message(String.format("Failed to open input file: %s", dataPath.toString()))
131+
.addContext(errorContext).addContext(e.getMessage()).build(logger);
132+
}
133+
// And lastly,... tell daffodil the input data stream.
134+
dafParser.setInputStream(dataInputStream);
135+
}
136+
137+
/**
138+
* This is the core of actual processing - data movement from Daffodil to Drill.
139+
* <p>
140+
* If there is space in the batch, and there is data available to parse then this calls the
141+
* daffodil parser, which parses data, delivering it to the rowWriter by way of the infoset
142+
* outputter.
143+
* <p>
144+
* Repeats until the rowWriter is full (a batch is full), or there is no more data, or a parse
145+
* error ends execution with a throw.
146+
* <p>
147+
* Validation errors and other warnings are not errors and are logged but do not cause parsing to
148+
* fail/throw.
149+
*
150+
* @return true if there are rows retrieved, false if no rows were retrieved, which means no more
151+
* will ever be retrieved (end of data).
152+
* @throws RuntimeException
153+
* on parse errors.
154+
*/
155+
@Override
156+
public boolean next() {
157+
// Check assumed invariants
158+
// We don't know if there is data or not. This could be called on an empty data file.
159+
// We DO know that this won't be called if there is no space in the batch for even 1
160+
// row.
161+
if (dafParser.isEOF()) {
162+
return false; // return without even checking for more rows or trying to parse.
163+
}
164+
while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
165+
// the predicate is always true once.
166+
dafParser.parse();
167+
if (dafParser.isProcessingError()) {
168+
assert (Objects.nonNull(dafParser.getDiagnostics()));
169+
throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
170+
.addContext(errorContext).build(logger);
171+
}
172+
if (dafParser.isValidationError()) {
173+
logger.warn(dafParser.getDiagnosticsAsString());
174+
// Note that even if daffodil is set to not validate, validation errors may still occur
175+
// from DFDL's "recoverableError" assertions.
176+
}
177+
rowSetLoader.save();
178+
}
179+
int nRows = rowSetLoader.rowCount();
180+
assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out
181+
// of here.
182+
return true;
183+
}
184+
185+
@Override
186+
public void close() {
187+
AutoCloseables.closeSilently(dataInputStream);
188+
}
189+
}
190+
191+
class DaffodilReaderConfig {
192+
final DaffodilFormatPlugin plugin;
193+
194+
DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
195+
this.plugin = plugin;
196+
}
197+
}

0 commit comments

Comments
 (0)