Skip to content

Commit 292b4a0

Browse files
committed
Use pipeline/streaming instead of reading in Values
This still parses input into in-memory dicom, but at least in case of binary input or a stream of values, inputs are streamed.
1 parent 47c0234 commit 292b4a0

4 files changed

Lines changed: 188 additions & 327 deletions

File tree

src/plugin.rs

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use dicom::object::DefaultDicomObject;
1010
use dicom::object::StandardDataDictionary;
1111
use indexmap::IndexMap;
1212
use nu_plugin::{EngineInterface, Plugin, PluginCommand};
13-
use nu_protocol::{Category, Example, LabeledError, PipelineData, Record, ShellError, Signature, Span, SyntaxShape, Value};
13+
use nu_protocol::{
14+
Category, Example, IntoInterruptiblePipelineData, IntoPipelineData, LabeledError, PipelineData, Record, ShellError, Signals, Signature, Span,
15+
SyntaxShape, Value,
16+
};
1417

15-
#[derive(Default)]
18+
#[derive(Default, Clone)]
1619
pub struct DcmPlugin {
1720
pub dcm_dictionary: StandardDataDictionary,
1821
}
@@ -95,7 +98,7 @@ impl PluginCommand for DcmPluginCommand {
9598
.extra_description("Parse DICOM objects from files or binary data. Invalid DICOM objects are reported as errors and excluded from the output unless --error flag is used.")
9699
}
97100

98-
fn examples(&self) -> Vec<Example> {
101+
fn examples(&self) -> Vec<Example<'_>> {
99102
vec![
100103
Example {
101104
description: "Parse a DICOM file by passing binary data",
@@ -124,59 +127,74 @@ impl PluginCommand for DcmPluginCommand {
124127
.get_current_dir()
125128
.map(PathBuf::from);
126129

127-
let span = input
130+
let input_span = input
128131
.span()
129132
.unwrap_or(call.head);
130133
let input_metadata = input.metadata();
131134

132-
// Convert input PipelineData to Value. The default behaviour for BinaryStream is to detect the type of the stream and if unknown, try to read it as a UTF-8 string.
133-
// This behaviour is wrong for us, since we want to read any binary input stream as binary data.
134-
// Unfortunately, a list of `[(`open file.dcm`), ...]` gets converted to a list of strings by nushell before we can access it.
135-
let input = if let PipelineData::ByteStream(byte_stream, ..) = input {
136-
// TODO eventually support streaming?
137-
let bytes = byte_stream.into_bytes()?;
138-
Value::binary(bytes, span)
139-
} else {
140-
// convert PipelineData to Value the usual way
141-
input.into_value(span)?
142-
};
143-
144-
let error_column = call.get_flag_value("error");
145-
let error_column = if let Some(Value::String { val, .. }) = error_column {
146-
Some(val)
147-
} else {
148-
None
149-
};
135+
let error_column = call.get_flag::<String>("error")?;
150136

151137
// run
152-
let output = self.run_filter(plugin, current_dir.as_deref(), &input, error_column)?;
138+
// TODO find a way without cloning? ListStream::map() requires 'static lifetime, maybe I can use iterators directly?
139+
let output = self.process_pipeline_data(plugin.clone(), current_dir, error_column, &input_span, input)?;
153140

154141
// Forward DataSource metadata from input to output, but clear any content type. This keeps the source.
155142
let output_metadata = input_metadata.map(|m| m.with_content_type(None));
156143

157-
Ok(PipelineData::Value(output, output_metadata))
144+
Ok(output.set_metadata(output_metadata))
158145
}
159146
}
160147

161148
impl DcmPluginCommand {
162-
pub fn run_filter(
149+
pub fn process_pipeline_data(
163150
&self,
164-
plugin: &DcmPlugin,
165-
current_dir: Result<&Path, &ShellError>,
166-
value: &Value,
151+
plugin: DcmPlugin,
152+
current_dir: Result<PathBuf, ShellError>,
167153
error_column: Option<String>,
168-
) -> Result<Value, LabeledError> {
169-
self.process_value(plugin, current_dir, value, &error_column)
154+
input_span: &Span,
155+
input: PipelineData,
156+
) -> Result<PipelineData, LabeledError> {
157+
match input {
158+
// no-op
159+
PipelineData::Empty => Ok(PipelineData::Empty),
160+
161+
// process value directly
162+
PipelineData::Value(value, ..) => {
163+
Self::process_value(&plugin, current_dir.as_deref(), &value, &error_column).map(Value::into_pipeline_data)
164+
}
165+
166+
// map list of values one by one
167+
PipelineData::ListStream(list_stream, ..) => {
168+
// TODO should this fail immediately or generate errors?
169+
let mapped_stream = list_stream.map(move |v| match Self::process_value(&plugin, current_dir.as_deref(), &v, &error_column) {
170+
Ok(value) => value,
171+
Err(e) => Value::error(e.into(), v.span()),
172+
});
173+
174+
Ok(mapped_stream.into_pipeline_data(*input_span, Signals::EMPTY))
175+
}
176+
177+
// process input bytestream directly without collecting it into memory
178+
PipelineData::ByteStream(byte_stream, ..) => {
179+
let byte_stream_reader = byte_stream
180+
.reader()
181+
.ok_or_else(|| LabeledError::new("Empty bytestream"))?;
182+
183+
let obj = read_dcm_stream(byte_stream_reader)
184+
.map_err(|e| LabeledError::new("Invalid DICOM data").with_label(e.to_string(), *input_span))?;
185+
186+
Self::process_dicom_object(&plugin, input_span, obj, &error_column).map(Value::into_pipeline_data)
187+
}
188+
}
170189
}
171190

172191
fn process_value(
173-
&self,
174192
plugin: &DcmPlugin,
175193
current_dir: Result<&Path, &ShellError>,
176194
value: &Value,
177195
error_column: &Option<String>,
178196
) -> Result<Value, LabeledError> {
179-
let result = self.process_value_with_normal_error(plugin, current_dir, value, error_column);
197+
let result = Self::process_value_with_normal_error(plugin, current_dir, value, error_column);
180198

181199
// TODO better value.span().unwrap()
182200
match (error_column, &result) {
@@ -198,7 +216,6 @@ impl DcmPluginCommand {
198216
}
199217

200218
fn process_value_with_normal_error(
201-
&self,
202219
plugin: &DcmPlugin,
203220
current_dir: Result<&Path, &ShellError>,
204221
value: &Value,
@@ -224,7 +241,7 @@ impl DcmPluginCommand {
224241
LabeledError::new("`dcm` expects valid DICOM binary data").with_label(text, *internal_span)
225242
})?;
226243

227-
self.process_dicom_object(plugin, internal_span, obj, error_column)
244+
Self::process_dicom_object(plugin, internal_span, obj, error_column)
228245
}
229246
Value::Record { val, internal_span } => {
230247
// Check if a file record
@@ -243,7 +260,7 @@ impl DcmPluginCommand {
243260
LabeledError::new("`dcm` expects valid DICOM binary data").with_label(text, *internal_span)
244261
})?;
245262

246-
return self.process_dicom_object(plugin, internal_span, obj, error_column);
263+
return Self::process_dicom_object(plugin, internal_span, obj, error_column);
247264
}
248265
}
249266

@@ -265,16 +282,13 @@ impl DcmPluginCommand {
265282
let cursor = Cursor::new(val);
266283
let obj = read_dcm_stream(cursor).map_err(|e| LabeledError::new("Invalid DICOM data").with_label(e.to_string(), *internal_span))?;
267284

268-
self.process_dicom_object(plugin, internal_span, obj, error_column)
285+
Self::process_dicom_object(plugin, internal_span, obj, error_column)
269286
}
270287
Value::List { vals, internal_span, .. } => {
271288
// Use either a dicom result or an error for each input element>
272289
let result: Vec<Value> = vals
273290
.iter()
274-
.map(|v| {
275-
self.process_value(plugin, current_dir, v, error_column)
276-
.unwrap_or_else(|e| Value::error(e.into(), *internal_span))
277-
})
291+
.map(|v| Self::process_value(plugin, current_dir, v, error_column).unwrap_or_else(|e| Value::error(e.into(), *internal_span)))
278292
.collect();
279293

280294
Ok(Value::list(result, *internal_span))
@@ -285,7 +299,6 @@ impl DcmPluginCommand {
285299
}
286300

287301
fn process_dicom_object(
288-
&self,
289302
plugin: &DcmPlugin,
290303
span: &Span,
291304
obj: DefaultDicomObject,

src/reader.rs

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
fs::File,
3-
io::{BufReader, ErrorKind, Read, Seek, SeekFrom},
3+
io::{BufReader, Cursor, Read},
44
path::Path,
55
};
66

@@ -13,14 +13,8 @@ pub enum Error {
1313
#[snafu(display("Could not read Dicom object: {}", source))]
1414
Io { source: std::io::Error },
1515

16-
#[snafu(display("Could not read Dicom preamble"))]
17-
Preamble,
18-
1916
#[snafu(display("Could not parse Dicom object: {}", source))]
2017
Dcm { source: dicom_object::ReadError },
21-
22-
#[snafu(display("Could not parse Dicom object (no preamble?): {}", source))]
23-
DcmNoPreamble { source: dicom_object::ReadError },
2418
}
2519

2620
pub fn read_dcm_file<P: AsRef<Path>>(path: P) -> Result<DefaultDicomObject, Error> {
@@ -29,44 +23,37 @@ pub fn read_dcm_file<P: AsRef<Path>>(path: P) -> Result<DefaultDicomObject, Erro
2923
read_dcm_stream(input)
3024
}
3125

32-
pub fn read_dcm_stream<F: Seek + Read>(mut input: F) -> Result<DefaultDicomObject, Error> {
33-
// TODO use lower level Dicom functions to avoid seeking back and forth and double-wrapping BufReaders
34-
35-
// read the first 128 + 4 bytes and check if DICM
36-
let mut buf = [0u8; 128 + 4];
37-
38-
match input.read_exact(&mut buf) {
39-
Ok(_) => {
40-
// check if DICM
41-
if buf[128..132] == *b"DICM" {
42-
// need to rewind back 4 to get to the beginning of DICM again
43-
input
44-
.seek(SeekFrom::Current(-4))
45-
.context(IoSnafu)?;
46-
47-
return read_dcm_stream_without_pixel_data(input).context(DcmSnafu);
48-
}
49-
}
50-
Err(e) => {
51-
// if seek error, fall through and try to read without preamble, otherwise fail now
52-
if e.kind() != ErrorKind::UnexpectedEof {
53-
return Err(Error::Io { source: e });
54-
}
55-
}
56-
}
57-
58-
// Rewind to the start and try to read without the preamble
26+
pub fn read_dcm_stream<F: Read>(mut input: F) -> Result<DefaultDicomObject, Error> {
27+
// Read the first 132 bytes into a temporary buffer to check for the preamble.
28+
let mut buf = Vec::with_capacity(132);
5929
input
60-
.seek(SeekFrom::Start(0))
30+
.by_ref()
31+
.take(132)
32+
.read_to_end(&mut buf)
6133
.context(IoSnafu)?;
6234

63-
// TODO this will always fail -- dicom.rs needs DICM magic to read meta
64-
read_dcm_stream_without_pixel_data(input).context(DcmNoPreambleSnafu)
65-
}
66-
67-
fn read_dcm_stream_without_pixel_data<F: Read>(input: F) -> Result<DefaultDicomObject, dicom_object::ReadError> {
68-
dicom_object::OpenFileOptions::new()
69-
.read_until(dicom::dictionary_std::tags::PIXEL_DATA)
70-
.read_preamble(dicom_object::file::ReadPreamble::Never)
71-
.from_reader(input)
35+
if buf.len() == 132 && &buf[128..132] == b"DICM" {
36+
// "DICM" marker found. The data to parse starts with these 4 bytes.
37+
// We create a new reader by chaining the "DICM" marker from our buffer
38+
// with the rest of the original input stream.
39+
let reader = Cursor::new(&buf[128..]).chain(input);
40+
41+
// Use the default OpenFileOptions to parse the File Meta Information.
42+
dicom_object::OpenFileOptions::new()
43+
.read_until(dicom::dictionary_std::tags::PIXEL_DATA)
44+
.read_preamble(dicom_object::file::ReadPreamble::Never)
45+
.from_reader(reader)
46+
.context(DcmSnafu)
47+
} else {
48+
// No "DICM" marker. The entire buffer is part of the dataset.
49+
// Create a reader from the buffer and chain it with the rest of the stream.
50+
let reader = Cursor::new(buf).chain(input);
51+
52+
// Attempt to parse as a dataset without a preamble.
53+
dicom_object::OpenFileOptions::new()
54+
.read_until(dicom::dictionary_std::tags::PIXEL_DATA)
55+
.read_preamble(dicom_object::file::ReadPreamble::Never)
56+
.from_reader(reader)
57+
.context(DcmSnafu)
58+
}
7259
}

0 commit comments

Comments
 (0)