-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathfrom_arrow.rs
More file actions
74 lines (70 loc) · 2.87 KB
/
from_arrow.rs
File metadata and controls
74 lines (70 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use arrow_array::RecordBatchReader;
use arrow_array::ffi_stream::ArrowArrayStreamReader;
use arrow_array::make_array;
use arrow_data::ArrayData as ArrowArrayData;
use arrow_schema::DataType;
use arrow_schema::Field;
use pyo3::exceptions::PyValueError;
use pyo3::intern;
use pyo3::prelude::*;
use vortex::array::ArrayRef;
use vortex::array::IntoArray;
use vortex::array::arrays::ChunkedArray;
use vortex::array::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexError;
use vortex::error::VortexResult;
use crate::arrays::PyArrayRef;
use crate::arrow::FromPyArrow;
use crate::classes::array_class;
use crate::classes::chunked_array_class;
use crate::classes::table_class;
use crate::error::PyVortexError;
use crate::error::PyVortexResult;
/// Convert an Arrow object to a Vortex array.
pub(super) fn from_arrow(obj: &Borrowed<'_, '_, PyAny>) -> PyVortexResult<PyArrayRef> {
let py = obj.py();
let pa_array = array_class(py)?;
let chunked_array = chunked_array_class(py)?;
let table = table_class(py)?;
if obj.is_instance(pa_array)? {
let arrow_array = ArrowArrayData::from_pyarrow(&obj.as_borrowed()).map(make_array)?;
let is_nullable = arrow_array.is_nullable();
let enc_array = ArrayRef::from_arrow(arrow_array.as_ref(), is_nullable)?;
Ok(PyArrayRef::from(enc_array))
} else if obj.is_instance(chunked_array)? {
let chunks: Vec<Bound<PyAny>> = obj.getattr(intern!(py, "chunks"))?.extract()?;
let encoded_chunks = chunks
.iter()
.map(|a| {
let arrow_array = ArrowArrayData::from_pyarrow(&a.as_borrowed()).map(make_array)?;
ArrayRef::from_arrow(arrow_array.as_ref(), false).map_err(PyVortexError::from)
})
.collect::<PyVortexResult<Vec<_>>>()?;
let dtype: DType = obj
.getattr(intern!(py, "type"))
.and_then(|v| DataType::from_pyarrow(&v.as_borrowed()))
.map(|dt| DType::from_arrow(&Field::new("_", dt, false)))?;
Ok(PyArrayRef::from(
ChunkedArray::try_new(encoded_chunks, dtype)?.into_array(),
))
} else if obj.is_instance(table)? {
let array_stream = ArrowArrayStreamReader::from_pyarrow(&obj.as_borrowed())?;
let dtype = DType::from_arrow(array_stream.schema());
let chunks = array_stream
.into_iter()
.map(|b| {
b.map_err(VortexError::from)
.and_then(|b| ArrayRef::from_arrow(b, false))
})
.collect::<VortexResult<Vec<_>>>()?;
Ok(PyArrayRef::from(
ChunkedArray::try_new(chunks, dtype)?.into_array(),
))
} else {
Err(PyValueError::new_err("Cannot convert object to Vortex array").into())
}
}