forked from indygreg/python-zstandard
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcompressor_iterator.rs
More file actions
130 lines (105 loc) · 3.53 KB
/
compressor_iterator.rs
File metadata and controls
130 lines (105 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (c) 2021-present, Gregory Szorc
// All rights reserved.
//
// This software may be modified and distributed under the terms
// of the BSD license. See the LICENSE file for details.
use {
crate::{
exceptions::ZstdError,
stream::{make_in_buffer_source, InBufferSource},
zstd_safe::CCtx,
},
pyo3::{prelude::*, types::PyBytes, IntoPyObjectExt},
std::sync::Arc,
};
#[pyclass(module = "zstandard.backend_rust")]
pub struct ZstdCompressorIterator {
cctx: Arc<CCtx<'static>>,
source: Box<dyn InBufferSource + Send>,
write_size: usize,
finished_output: bool,
}
unsafe impl Sync for ZstdCompressorIterator {}
#[pymethods]
impl ZstdCompressorIterator {
// PyIterProtocol.
fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
fn __next__(mut slf: PyRefMut<Self>) -> PyResult<Option<PyObject>> {
if slf.finished_output {
return Ok(None);
}
let py = unsafe { Python::assume_gil_acquired() };
let mut dest_buffer: Vec<u8> = Vec::with_capacity(slf.write_size);
// Feed data into the compressor until there is output data.
while let Some(mut in_buffer) = slf.source.input_buffer(py)? {
let old_pos = in_buffer.pos;
slf.cctx
.compress_into_vec(
&mut dest_buffer,
&mut in_buffer,
zstd_sys::ZSTD_EndDirective::ZSTD_e_continue,
)
.map_err(|msg| ZstdError::new_err(format!("zstd compress error: {}", msg)))?;
slf.source.record_bytes_read(in_buffer.pos - old_pos);
// Emit compressed data, if available.
if !dest_buffer.is_empty() {
// TODO avoid buffer copy
let chunk = PyBytes::new(py, &dest_buffer);
return Ok(Some(chunk.into_py_any(py).unwrap()));
}
// Else read another chunk in hopes of producing output data.
continue;
}
// Input data is exhausted. End the stream and emit what remains.
let mut in_buffer = zstd_sys::ZSTD_inBuffer {
src: std::ptr::null_mut(),
size: 0,
pos: 0,
};
let zresult = slf
.cctx
.compress_into_vec(
&mut dest_buffer,
&mut in_buffer,
zstd_sys::ZSTD_EndDirective::ZSTD_e_end,
)
.map_err(|msg| {
ZstdError::new_err(format!("error ending compression stream: {}", msg))
})?;
if zresult == 0 {
slf.finished_output = true;
}
if !dest_buffer.is_empty() {
// TODO avoid buffer copy.
let chunk = PyBytes::new(py, &dest_buffer);
return Ok(Some(chunk.into_py_any(py).unwrap()));
}
Ok(None)
}
}
impl ZstdCompressorIterator {
pub fn new(
py: Python,
cctx: Arc<CCtx<'static>>,
reader: &Bound<'_, PyAny>,
size: u64,
read_size: usize,
write_size: usize,
) -> PyResult<Self> {
let source = make_in_buffer_source(py, reader, read_size)?;
let size = match source.source_size() {
Some(size) => size as _,
None => size,
};
cctx.set_pledged_source_size(size)
.map_err(|msg| ZstdError::new_err(format!("error setting source size: {}", msg)))?;
Ok(Self {
cctx,
source,
write_size,
finished_output: false,
})
}
}