forked from indygreg/python-zstandard
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecompressor_iterator.rs
More file actions
110 lines (91 loc) · 3.11 KB
/
decompressor_iterator.rs
File metadata and controls
110 lines (91 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) 2021-present, Gregory Szorc
// All rights reserved.
//
// This software may be modified and distributed under the terms
// of the BSD license. See the LICENSE file for details.
use {
crate::{
exceptions::ZstdError,
stream::{make_in_buffer_source, InBufferSource},
zstd_safe::DCtx,
},
pyo3::{exceptions::PyValueError, prelude::*, types::PyBytes, IntoPyObjectExt},
std::{cmp::min, sync::Arc},
};
#[pyclass(module = "zstandard.backend_rust")]
pub struct ZstdDecompressorIterator {
dctx: Arc<DCtx<'static>>,
source: Box<dyn InBufferSource + Send>,
write_size: usize,
finished_output: bool,
}
unsafe impl Sync for ZstdDecompressorIterator {}
#[pymethods]
impl ZstdDecompressorIterator {
// PyIterProtocol.
fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
fn __next__(mut slf: PyRefMut<Self>) -> PyResult<Option<PyObject>> {
if slf.finished_output {
return Ok(None);
}
let py = unsafe { Python::assume_gil_acquired() };
let mut dest_buffer: Vec<u8> = Vec::with_capacity(slf.write_size);
// While input is available.
while let Some(mut in_buffer) = slf.source.input_buffer(py)? {
let old_pos = in_buffer.pos;
let zresult = slf
.dctx
.decompress_into_vec(&mut dest_buffer, &mut in_buffer)
.map_err(|msg| ZstdError::new_err(format!("zstd decompress error: {}", msg)))?;
slf.source.record_bytes_read(in_buffer.pos - old_pos);
if zresult == 0 {
slf.finished_output = true;
}
// Emit chunk if output buffer has data.
if !dest_buffer.is_empty() {
// TODO avoid buffer copy.
let chunk = PyBytes::new(py, &dest_buffer);
return Ok(Some(chunk.into_py_any(py).unwrap()));
}
// Repeat loop to collect more input data.
continue;
}
// Input is exhausted. Emit what we have or finish.
if !dest_buffer.is_empty() {
// TODO avoid buffer copy.
let chunk = PyBytes::new(py, &dest_buffer);
Ok(Some(chunk.into_py_any(py).unwrap()))
} else {
Ok(None)
}
}
}
impl ZstdDecompressorIterator {
pub fn new(
py: Python,
dctx: Arc<DCtx<'static>>,
reader: &Bound<'_, PyAny>,
read_size: usize,
write_size: usize,
skip_bytes: usize,
) -> PyResult<Self> {
let mut source = make_in_buffer_source(py, reader, read_size)?;
let mut skip_bytes = skip_bytes;
while skip_bytes > 0 {
let in_buffer = source
.input_buffer(py)?
.ok_or_else(|| PyValueError::new_err("skip_bytes larger than first input chunk"))?;
let read = min(skip_bytes, in_buffer.size - in_buffer.pos);
source.record_bytes_read(read);
skip_bytes -= read;
}
Ok(Self {
dctx,
source,
write_size,
finished_output: false,
})
}
}